"""
high level interface to subscriptions
"""
import time
import logging
from threading import Lock
import sys
if sys.version_info.major == 3 and sys.version_info.minor >= 6:
from collections.abc import Iterable
else:
from collections import Iterable
from opcua import ua
from opcua.common import events
from opcua import Node
[docs]class SubHandler(object):
"""
Subscription Handler. To receive events from server for a subscription
This class is just a sample class. Whatever class having these methods can be used
"""
[docs] def data_change(self, handle, node, val, attr):
"""
Deprecated, use datachange_notification
"""
pass
[docs] def datachange_notification(self, node, val, data):
"""
called for every datachange notification from server
"""
pass
[docs] def event_notification(self, event):
"""
called for every event notification from server
"""
pass
[docs] def status_change_notification(self, status):
"""
called for every status change notification from server
"""
pass
[docs]class SubscriptionItemData(object):
"""
To store useful data from a monitored item
"""
def __init__(self):
self.node = None
self.client_handle = None
self.server_handle = None
self.attribute = None
self.mfilter = None
[docs]class DataChangeNotif(object):
"""
To be send to clients for every datachange notification from server
"""
def __init__(self, subscription_data, monitored_item):
self.monitored_item = monitored_item
self.subscription_data = subscription_data
def __str__(self):
return "DataChangeNotification({0}, {1})".format(self.subscription_data, self.monitored_item)
__repr__ = __str__
[docs]class Subscription(object):
"""
Subscription object returned by Server or Client objects.
The object represent a subscription to an opc-ua server.
This is a high level class, especially subscribe_data_change
and subscribe_events methods. If more control is necessary look at
code and/or use create_monitored_items method.
"""
def __init__(self, server, params, handler):
self.logger = logging.getLogger(__name__)
self.server = server
self._client_handle = 200
self._handler = handler
self.parameters = params # move to data class
self._monitoreditems_map = {}
self._lock = Lock()
self.subscription_id = None
self.has_unknown_handlers = False
response = self.server.create_subscription(
params, self.publish_callback, ready_callback=self.ready_callback)
# Set it here to keep the old behavof as well, but this may not run if
# the above times out
self.subscription_id = response.SubscriptionId
#Send a publish request so the server has one in its queue
# Servers should alsways be able to handle at least on extra publish request per subscriptions
self.server.publish()
[docs] def delete(self):
"""
Delete subscription on server. This is automatically done by Client and Server classes on exit
"""
results = self.server.delete_subscriptions([self.subscription_id])
results[0].check()
[docs] def is_ready(self):
return bool(self.subscription_id)
[docs] def ready_callback(self, response):
self.subscription_id = self.subscription_id or response.Parameters.SubscriptionId
self.server.publish()
[docs] def publish_callback(self, publishresult):
self.logger.info("Publish callback called with result: %s", publishresult)
if not self.is_ready():
self.logger.warning(
"Result received but subscription not ready %s", publishresult)
return
if publishresult.NotificationMessage.NotificationData is not None:
for notif in publishresult.NotificationMessage.NotificationData:
if isinstance(notif, ua.DataChangeNotification):
self._call_datachange(notif)
elif isinstance(notif, ua.EventNotificationList):
self._call_event(notif)
elif isinstance(notif, ua.StatusChangeNotification):
self._call_status(notif)
else:
self.logger.warning("Notification type not supported yet for notification %s", notif)
else:
self.logger.warning("NotificationMessage is None.")
ack = ua.SubscriptionAcknowledgement()
ack.SubscriptionId = self.subscription_id
ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
self.server.publish([ack])
def _call_datachange(self, datachange):
for item in datachange.MonitoredItems:
with self._lock:
if item.ClientHandle not in self._monitoreditems_map:
self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
self.has_unknown_handlers = True
continue
data = self._monitoreditems_map[item.ClientHandle]
if hasattr(self._handler, "datachange_notification"):
event_data = DataChangeNotif(data, item)
try:
self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
except Exception:
self.logger.exception("Exception calling data change handler")
elif hasattr(self._handler, "data_change"): # deprecated API
self.logger.warning("data_change method is deprecated, use datachange_notification")
try:
self._handler.data_change(data.server_handle, data.node, item.Value.Value.Value, data.attribute)
except Exception:
self.logger.exception("Exception calling deprecated data change handler")
else:
self.logger.error("DataChange subscription created but handler has no datachange_notification method")
def _call_event(self, eventlist):
for event in eventlist.Events:
with self._lock:
data = self._monitoreditems_map[event.ClientHandle]
result = events.Event.from_event_fields(data.mfilter.SelectClauses, event.EventFields)
result.server_handle = data.server_handle
if hasattr(self._handler, "event_notification"):
try:
self._handler.event_notification(result)
except Exception:
self.logger.exception("Exception calling event handler")
elif hasattr(self._handler, "event"): # depcrecated API
try:
self._handler.event(data.server_handle, result)
except Exception:
self.logger.exception("Exception calling deprecated event handler")
else:
self.logger.error("Event subscription created but handler has no event_notification method")
def _call_status(self, status):
try:
self._handler.status_change_notification(status.Status)
except Exception:
self.logger.exception("Exception calling status change handler")
[docs] def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value, queuesize=0):
"""
Subscribe for data change events for a node or list of nodes.
default attribute is Value.
Return a handle which can be used to unsubscribe
If more control is necessary use create_monitored_items method
"""
return self._subscribe(nodes, attr, queuesize=queuesize)
[docs] def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtypes=ua.ObjectIds.BaseEventType, evfilter=None, queuesize=0):
"""
Subscribe to events from a node. Default node is Server node.
In most servers the server node is the only one you can subscribe to.
if evtypes is not provided, evtype defaults to BaseEventType
if evtypes is a list or tuple of custom event types, the events will be filtered to the supplied types
Return a handle which can be used to unsubscribe
"""
sourcenode = Node(self.server, sourcenode)
if evfilter is None:
if not type(evtypes) in (list, tuple):
evtypes = [evtypes]
evtypes = [Node(self.server, evtype) for evtype in evtypes]
evfilter = events.get_filter_from_event_type(evtypes)
return self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter, queuesize=queuesize)
def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
is_list = True
if isinstance(nodes, Iterable):
nodes = list(nodes)
else:
nodes = [nodes]
is_list = False
mirs = []
for node in nodes:
mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
mirs.append(mir)
mids = self.create_monitored_items(mirs)
if is_list:
return mids
if type(mids[0]) == ua.StatusCode:
mids[0].check()
return mids[0]
def _make_monitored_item_request(self, node, attr, mfilter, queuesize):
rv = ua.ReadValueId()
rv.NodeId = node.nodeid
rv.AttributeId = attr
# rv.IndexRange //We leave it null, then the entire array is returned
mparams = ua.MonitoringParameters()
with self._lock:
self._client_handle += 1
mparams.ClientHandle = self._client_handle
mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
mparams.QueueSize = queuesize
mparams.DiscardOldest = True
if mfilter:
mparams.Filter = mfilter
mir = ua.MonitoredItemCreateRequest()
mir.ItemToMonitor = rv
mir.MonitoringMode = ua.MonitoringMode.Reporting
mir.RequestedParameters = mparams
return mir
[docs] def create_monitored_items(self, monitored_items):
"""
low level method to have full control over subscription parameters
Client handle must be unique since it will be used as key for internal registration of data
"""
params = ua.CreateMonitoredItemsParameters()
params.SubscriptionId = self.subscription_id
params.ItemsToCreate = monitored_items
params.TimestampsToReturn = ua.TimestampsToReturn.Both
# insert monitored item into map to avoid notification arrive before result return
# server_handle is left as None in purpose as we don't get it yet.
with self._lock:
for mi in monitored_items:
data = SubscriptionItemData()
data.client_handle = mi.RequestedParameters.ClientHandle
data.node = Node(self.server, mi.ItemToMonitor.NodeId)
data.attribute = mi.ItemToMonitor.AttributeId
#TODO: Either use the filter from request or from response. Here it uses from request, in modify it uses from response
data.mfilter = mi.RequestedParameters.Filter
self._monitoreditems_map[mi.RequestedParameters.ClientHandle] = data
results = self.server.create_monitored_items(params)
mids = []
# process result, add server_handle, or remove it if failed
with self._lock:
for idx, result in enumerate(results):
mi = params.ItemsToCreate[idx]
if not result.StatusCode.is_good():
del self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
mids.append(result.StatusCode)
continue
data = self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
data.server_handle = result.MonitoredItemId
mids.append(result.MonitoredItemId)
return mids
[docs] def unsubscribe(self, handle):
"""
unsubscribe to datachange or events using the handle returned while subscribing
if you delete subscription, you do not need to unsubscribe
"""
handles = [handle] if type(handle) is int else handle
if not handles:
return
params = ua.DeleteMonitoredItemsParameters()
params.SubscriptionId = self.subscription_id
params.MonitoredItemIds = handles
results = self.server.delete_monitored_items(params)
results[0].check()
with self._lock:
for k, v in self._monitoreditems_map.items():
if v.server_handle == handle:
del(self._monitoreditems_map[k])
return
[docs] def modify_monitored_item(self, handle, new_samp_time, new_queuesize=0, mod_filter_val=-1):
"""
Modify a monitored item.
:param handle: Handle returned when originally subscribing
:param new_samp_time: New wanted sample time
:param new_queuesize: New wanted queuesize, default is 0
:param mod_filter_val: New deadband filter value
:return: Return a Modify Monitored Item Result
"""
for monitored_item_index in self._monitoreditems_map:
if self._monitoreditems_map[monitored_item_index].server_handle == handle:
item_to_change = self._monitoreditems_map[monitored_item_index]
break
if mod_filter_val is None:
mod_filter = None
elif mod_filter_val < 0:
mod_filter = item_to_change.mfilter
else:
mod_filter = ua.DataChangeFilter()
mod_filter.Trigger = ua.DataChangeTrigger(1) # send notification when status or value change
mod_filter.DeadbandType = 1
mod_filter.DeadbandValue = mod_filter_val # absolute float value or from 0 to 100 for percentage deadband
modif_item = ua.MonitoredItemModifyRequest()
modif_item.MonitoredItemId = handle
modif_item.RequestedParameters = self._modify_monitored_item_request(new_queuesize, new_samp_time,
mod_filter, item_to_change.client_handle)
params = ua.ModifyMonitoredItemsParameters()
params.SubscriptionId = self.subscription_id
params.ItemsToModify.append(modif_item)
results = self.server.modify_monitored_items(params)
item_to_change.mfilter = results[0].FilterResult
return results
def _modify_monitored_item_request(self, new_queuesize, new_samp_time, mod_filter, client_handle):
req_params = ua.MonitoringParameters()
with self._lock:
req_params.ClientHandle = client_handle
req_params.QueueSize = new_queuesize
req_params.Filter = mod_filter
req_params.SamplingInterval = new_samp_time
return req_params
[docs] def deadband_monitor(self, var, deadband_val, deadbandtype=1, queuesize=0, attr=ua.AttributeIds.Value):
"""
Method to create a subscription with a Deadband Value.
Default deadband value type is absolute.
Return a handle which can be used to unsubscribe
:param var: Variable to which you want to subscribe
:param deadband_val: Absolute float value
:param deadbandtype: Default value is 1 (absolute), change to 2 for percentage deadband
:param queuesize: Wanted queue size, default is 1
"""
deadband_filter = ua.DataChangeFilter()
deadband_filter.Trigger = ua.DataChangeTrigger(1) # send notification when status or value change
deadband_filter.DeadbandType = deadbandtype
deadband_filter.DeadbandValue = deadband_val # absolute float value or from 0 to 100 for percentage deadband
return self._subscribe(var, attr, deadband_filter, queuesize)
[docs] def reconciliate(self, monitored_items):
"""
Reconciliate client monitored_items with its server counterpart.
:param monitored_items_srv: monitored items handles from server
:return: Number of mi added and deleted to the client subscription
"""
mi_client_handlers = set(monitored_items[1])
monitored_map = set(self._monitoreditems_map.keys())
# find MI still present on the server-side
client_handler_to_del = mi_client_handlers - monitored_map
server_handler_to_del = []
for idx, item in enumerate(monitored_items[1]):
if item in client_handler_to_del:
server_handler_to_del.append(monitored_items[0][idx])
for item in server_handler_to_del:
try:
self.unsubscribe(item)
# fail silenty if the MI has already been removed
except ua.uaerrors.BadMonitoredItemIdInvalid:
pass
self.has_unknown_handlers = False
return len(client_handler_to_del)