"""
server side implementation of a subscription object
"""
from threading import RLock
import logging
# import copy
# import traceback
from opcua import ua
[docs]class MonitoredItemData(object):
def __init__(self):
self.client_handle = None
self.callback_handle = None
self.monitored_item_id = None
self.mode = None
self.filter = None
self.mvalue = MonitoredItemValues()
self.where_clause_evaluator = None
self.queue_size = 0
[docs]class MonitoredItemValues(object):
def __init__(self):
self.current_value = None
self.old_value = None
[docs] def set_current_value(self, cur_val):
self.old_value = self.current_value
self.current_value = cur_val
[docs] def get_current_value(self):
return self.current_value
[docs] def get_old_value(self):
return self.old_value
[docs]class MonitoredItemService(object):
"""
implement monitoreditem service for 1 subscription
"""
def __init__(self, isub, aspace):
self.logger = logging.getLogger(__name__ + "." + str(isub.data.SubscriptionId))
self.isub = isub
self.aspace = aspace
self._lock = RLock()
self._monitored_items = {}
self._monitored_events = {}
self._monitored_datachange = {}
self._monitored_item_counter = 111
[docs] def delete_all_monitored_items(self):
self.delete_monitored_items([mdata.monitored_item_id for mdata in self._monitored_items.values()])
[docs] def create_monitored_items(self, params):
results = []
for item in params.ItemsToCreate:
with self._lock:
if item.ItemToMonitor.AttributeId == ua.AttributeIds.EventNotifier:
result = self._create_events_monitored_item(item)
else:
result = self._create_data_change_monitored_item(item)
results.append(result)
return results
[docs] def modify_monitored_items(self, params):
results = []
for item in params.ItemsToModify:
results.append(self._modify_monitored_item(item))
return results
[docs] def trigger_datachange(self, handle, nodeid, attr):
self.logger.debug("triggering datachange for handle %s, nodeid %s, and attribute %s", handle, nodeid, attr)
datavalue = self.aspace.get_attribute_value(nodeid, attr)
self.datachange_callback(handle, datavalue)
def _modify_monitored_item(self, params):
with self._lock:
for mdata in self._monitored_items.values():
result = ua.MonitoredItemModifyResult()
if mdata.monitored_item_id == params.MonitoredItemId:
result.RevisedSamplingInterval = params.RequestedParameters.SamplingInterval
result.RevisedQueueSize = params.RequestedParameters.QueueSize
if params.RequestedParameters.Filter is not None:
mdata.filter = params.RequestedParameters.Filter
mdata.queue_size = params.RequestedParameters.QueueSize
return result
result = ua.MonitoredItemModifyResult()
result.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
return result
def _commit_monitored_item(self, result, mdata):
if result.StatusCode.is_good():
self._monitored_items[result.MonitoredItemId] = mdata
def _make_monitored_item_common(self, params):
result = ua.MonitoredItemCreateResult()
result.RevisedSamplingInterval = self.isub.data.RevisedPublishingInterval
result.RevisedQueueSize = params.RequestedParameters.QueueSize
self._monitored_item_counter += 1
result.MonitoredItemId = self._monitored_item_counter
self.logger.debug("Creating MonitoredItem with id %s", result.MonitoredItemId)
mdata = MonitoredItemData()
mdata.mode = params.MonitoringMode
mdata.client_handle = params.RequestedParameters.ClientHandle
mdata.monitored_item_id = result.MonitoredItemId
mdata.queue_size = params.RequestedParameters.QueueSize
mdata.filter = params.RequestedParameters.Filter
return result, mdata
def _create_events_monitored_item(self, params):
self.logger.info("request to subscribe to events for node %s and attribute %s",
params.ItemToMonitor.NodeId,
params.ItemToMonitor.AttributeId)
result, mdata = self._make_monitored_item_common(params)
ev_notify_byte = self.aspace.get_attribute_value(
params.ItemToMonitor.NodeId, ua.AttributeIds.EventNotifier).Value.Value
if ev_notify_byte is None or not ua.ua_binary.test_bit(ev_notify_byte, ua.EventNotifier.SubscribeToEvents):
result.StatusCode = ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
return result
# result.FilterResult = ua.EventFilterResult() # spec says we can ignore if not error
mdata.where_clause_evaluator = WhereClauseEvaluator(self.logger, self.aspace, mdata.filter.WhereClause)
self._commit_monitored_item(result, mdata)
if params.ItemToMonitor.NodeId not in self._monitored_events:
self._monitored_events[params.ItemToMonitor.NodeId] = []
self._monitored_events[params.ItemToMonitor.NodeId].append(result.MonitoredItemId)
return result
def _create_data_change_monitored_item(self, params):
self.logger.info("request to subscribe to datachange for node %s and attribute %s",
params.ItemToMonitor.NodeId,
params.ItemToMonitor.AttributeId)
result, mdata = self._make_monitored_item_common(params)
result.FilterResult = params.RequestedParameters.Filter
result.StatusCode, handle = self.aspace.add_datachange_callback(
params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId, self.datachange_callback)
self.logger.debug("adding callback return status %s and handle %s", result.StatusCode, handle)
mdata.callback_handle = handle
self._commit_monitored_item(result, mdata)
if result.StatusCode.is_good():
self._monitored_datachange[handle] = result.MonitoredItemId
# force data change event generation
self.trigger_datachange(handle, params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
return result
[docs] def delete_monitored_items(self, ids):
self.logger.debug("delete monitored items %s", ids)
with self._lock:
results = []
for mid in ids:
results.append(self._delete_monitored_items(mid))
return results
def _delete_monitored_items(self, mid):
if mid not in self._monitored_items:
return ua.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
for k, v in self._monitored_events.items():
if mid in v:
v.remove(mid)
if not v:
self._monitored_events.pop(k)
break
for k, v in self._monitored_datachange.items():
if v == mid:
self.aspace.delete_datachange_callback(k)
self._monitored_datachange.pop(k)
break
self._monitored_items.pop(mid)
return ua.StatusCode()
[docs] def datachange_callback(self, handle, value, error=None):
if error:
self.logger.info("subscription %s: datachange callback called with handle '%s' and erorr '%s'", self,
handle, error)
self.trigger_statuschange(error)
else:
self.logger.info("subscription %s: datachange callback called with handle '%s' and value '%s'", self,
handle, value.Value)
event = ua.MonitoredItemNotification()
with self._lock:
mid = self._monitored_datachange[handle]
mdata = self._monitored_items[mid]
mdata.mvalue.set_current_value(value.Value.Value)
if mdata.filter:
deadband_flag_pass = self.deadband_callback(mdata.mvalue, mdata.filter)
else:
deadband_flag_pass = True
if deadband_flag_pass:
event.ClientHandle = mdata.client_handle
event.Value = value
self.isub.enqueue_datachange_event(mid, event, mdata.queue_size)
[docs] def deadband_callback(self, values, flt):
if flt.DeadbandType == ua.DeadbandType.None_ or values.get_old_value() is None:
return True
elif flt.DeadbandType == ua.DeadbandType.Absolute and \
((abs(values.get_current_value() - values.get_old_value())) > flt.DeadbandValue):
return True
elif flt.DeadbandType == ua.DeadbandType.Percent:
self.logger.warn("DeadbandType Percent is not implemented !")
return True
else:
return False
[docs] def trigger_event(self, event):
with self._lock:
if event.emitting_node not in self._monitored_events:
self.logger.debug("%s has no subscription for events %s from node: %s",
self, event, event.emitting_node)
return False
self.logger.debug("%s has subscription for events %s from node: %s",
self, event, event.emitting_node)
mids = self._monitored_events[event.emitting_node]
for mid in mids:
self._trigger_event(event, mid)
def _trigger_event(self, event, mid):
if mid not in self._monitored_items:
self.logger.debug("Could not find monitored items for id %s for event %s in subscription %s",
mid, event, self)
return
mdata = self._monitored_items[mid]
if not mdata.where_clause_evaluator.eval(event):
self.logger.info("%s, %s, Event %s does not fit WhereClause, not generating event", self, mid, event)
return
fieldlist = ua.EventFieldList()
fieldlist.ClientHandle = mdata.client_handle
fieldlist.EventFields = event.to_event_fields(mdata.filter.SelectClauses)
self.isub.enqueue_event(mid, fieldlist, mdata.queue_size)
[docs] def trigger_statuschange(self, code):
self.isub.enqueue_statuschange(code)
[docs]class InternalSubscription(object):
def __init__(self, subservice, data, addressspace, callback):
self.logger = logging.getLogger(__name__)
self.aspace = addressspace
self.subservice = subservice
self.data = data
self.callback = callback
self.monitored_item_srv = MonitoredItemService(self, addressspace)
self.task = None
self._lock = RLock()
self._triggered_datachanges = {}
self._triggered_events = {}
self._triggered_statuschanges = []
self._notification_seq = 1
self._not_acknowledged_results = {}
self._startup = True
self._keep_alive_count = 0
self._publish_cycles_count = 0
self._stopev = False
def __str__(self):
return "Subscription(id:{0})".format(self.data.SubscriptionId)
[docs] def start(self):
self.logger.debug("starting subscription %s", self.data.SubscriptionId)
if self.data.RevisedPublishingInterval > 0.0:
self._subscription_loop()
[docs] def stop(self):
self.logger.debug("stopping subscription %s", self.data.SubscriptionId)
self._stopev = True
self.monitored_item_srv.delete_all_monitored_items()
def _trigger_publish(self):
if not self._stopev and self.data.RevisedPublishingInterval <= 0.0:
self.subservice.loop.call_soon(self.publish_results)
def _subscription_loop(self):
if not self._stopev:
self.subservice.loop.call_later(self.data.RevisedPublishingInterval / 1000.0, self._sub_loop)
def _sub_loop(self):
if self._stopev:
return
self.publish_results()
self._subscription_loop()
[docs] def has_published_results(self):
with self._lock:
if self._startup or self._triggered_datachanges or self._triggered_events:
return True
if self._keep_alive_count > self.data.RevisedMaxKeepAliveCount:
self.logger.debug("keep alive count %s is > than max keep alive count %s, sending publish event",
self._keep_alive_count, self.data.RevisedMaxKeepAliveCount)
return True
self._keep_alive_count += 1
return False
[docs] def publish_results(self):
if self._publish_cycles_count > self.data.RevisedLifetimeCount:
self.logger.warning("Subscription %s has expired, publish cycle count(%s) > lifetime count (%s)",
self, self._publish_cycles_count, self.data.RevisedLifetimeCount)
# FIXME this will never be send since we do not have publish request anyway
self.monitored_item_srv.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout))
self._stopev = True
result = None
with self._lock:
if self.has_published_results():
# FIXME: should we pop a publish request here? or we do not care?
self._publish_cycles_count += 1
result = self._pop_publish_result()
if result is not None:
self.callback(result)
def _pop_publish_result(self):
result = ua.PublishResult()
result.SubscriptionId = self.data.SubscriptionId
self._pop_triggered_datachanges(result)
self._pop_triggered_events(result)
self._pop_triggered_statuschanges(result)
self._keep_alive_count = 0
self._startup = False
result.NotificationMessage.SequenceNumber = self._notification_seq
if len(result.NotificationMessage.NotificationData) != 0:
self._notification_seq += 1
self._not_acknowledged_results[result.NotificationMessage.SequenceNumber] = result
result.MoreNotifications = False
result.AvailableSequenceNumbers = list(self._not_acknowledged_results.keys())
return result
def _pop_triggered_datachanges(self, result):
if self._triggered_datachanges:
notif = ua.DataChangeNotification()
notif.MonitoredItems = [item for sublist in self._triggered_datachanges.values() for item in sublist]
self._triggered_datachanges = {}
self.logger.debug("sending datachanges notification with %s events", len(notif.MonitoredItems))
result.NotificationMessage.NotificationData.append(notif)
def _pop_triggered_events(self, result):
if self._triggered_events:
notif = ua.EventNotificationList()
notif.Events = [item for sublist in self._triggered_events.values() for item in sublist]
self._triggered_events = {}
result.NotificationMessage.NotificationData.append(notif)
self.logger.debug("sending event notification with %s events", len(notif.Events))
def _pop_triggered_statuschanges(self, result):
if self._triggered_statuschanges:
notif = ua.StatusChangeNotification()
notif.Status = self._triggered_statuschanges.pop(0)
result.NotificationMessage.NotificationData.append(notif)
self.logger.debug("sending event notification %s", notif.Status)
[docs] def publish(self, acks):
self.logger.info("publish request with acks %s", acks)
with self._lock:
self._publish_cycles_count = 0
for nb in acks:
self._not_acknowledged_results.pop(nb, None)
[docs] def republish(self, nb):
self.logger.info("re-publish request for ack %s in subscription %s", nb, self)
with self._lock:
result = self._not_acknowledged_results.pop(nb, None)
if result:
self.logger.info("re-publishing ack %s in subscription %s", nb, self)
return result.NotificationMessage
else:
self.logger.info("Error request to re-published non existing ack %s in subscription %s", nb, self)
return ua.NotificationMessage()
[docs] def enqueue_datachange_event(self, mid, eventdata, maxsize):
with self._lock:
self._enqueue_event(mid, eventdata, maxsize, self._triggered_datachanges)
[docs] def enqueue_event(self, mid, eventdata, maxsize):
with self._lock:
self._enqueue_event(mid, eventdata, maxsize, self._triggered_events)
[docs] def enqueue_statuschange(self, code):
self._triggered_statuschanges.append(code)
self._trigger_publish()
def _enqueue_event(self, mid, eventdata, size, queue):
if mid not in queue:
queue[mid] = [eventdata]
self._trigger_publish()
return
if size != 0:
if len(queue[mid]) >= size:
queue[mid].pop(0)
queue[mid].append(eventdata)
[docs]class WhereClauseEvaluator(object):
def __init__(self, logger, aspace, whereclause):
self.logger = logger
self.elements = whereclause.Elements
self._aspace = aspace
[docs] def eval(self, event):
if not self.elements:
return True
# spec says we should only evaluate first element, which may use other elements
try:
res = self._eval_el(0, event)
except Exception as ex:
self.logger.exception("Exception while evaluating WhereClause %s for event %s: %s",
self.elements, event, ex)
return False
return res
def _eval_el(self, index, event):
el = self.elements[index]
# ops = [self._eval_op(op, event) for op in el.FilterOperands]
ops = el.FilterOperands # just to make code more readable
if el.FilterOperator == ua.FilterOperator.Equals:
return self._eval_op(ops[0], event) == self._eval_op(ops[1], event)
elif el.FilterOperator == ua.FilterOperator.IsNull:
return self._eval_op(ops[0], event) is None # FIXME: might be too strict
elif el.FilterOperator == ua.FilterOperator.GreaterThan:
return self._eval_op(ops[0], event) > self._eval_op(ops[1], event)
elif el.FilterOperator == ua.FilterOperator.LessThan:
return self._eval_op(ops[0], event) < self._eval_op(ops[1], event)
elif el.FilterOperator == ua.FilterOperator.GreaterThanOrEqual:
return self._eval_op(ops[0], event) >= self._eval_op(ops[1], event)
elif el.FilterOperator == ua.FilterOperator.LessThanOrEqual:
return self._eval_op(ops[0], event) <= self._eval_op(ops[1], event)
elif el.FilterOperator == ua.FilterOperator.Like:
return self._likeoperator(self._eval_op(ops[0], event), self._eval_op(ops[1], event))
elif el.FilterOperator == ua.FilterOperator.Not:
return not self._eval_op(ops[0], event)
elif el.FilterOperator == ua.FilterOperator.Between:
return self._eval_el(ops[2], event) >= self._eval_op(ops[0], event) >= self._eval_op(ops[1], event)
elif el.FilterOperator == ua.FilterOperator.InList:
return self._eval_op(ops[0], event) in [self._eval_op(op, event) for op in ops[1:]]
elif el.FilterOperator == ua.FilterOperator.And:
return self._eval_op(ops[0], event) and self._eval_op(ops[1], event)
elif el.FilterOperator == ua.FilterOperator.Or:
return self._eval_op(ops[0], event) or self._eval_op(ops[1], event)
elif el.FilterOperator == ua.FilterOperator.Cast:
self.logger.warn("Cast operand not implemented, assuming True")
return True
elif el.FilterOperator == ua.FilterOperator.OfType:
return event.EventType == self._eval_op(ops[0], event)
else:
# TODO: implement missing operators
self.logger.warning("WhereClause not implemented for element: %s", el)
raise NotImplementedError
def _like_operator(self, string, pattern):
raise NotImplementedError
def _eval_op(self, op, event):
# seems spec says we should return Null if issues
if type(op) is ua.ElementOperand:
return self._eval_el(op.Index, event)
elif type(op) is ua.AttributeOperand:
if op.BrowsePath:
return getattr(event, op.BrowsePath.Elements[0].TargetName.Name)
else:
return self._aspace.get_attribute_value(event.EventType, op.AttributeId).Value.Value
# FIXME: check, this is probably broken
elif type(op) is ua.SimpleAttributeOperand:
if op.BrowsePath:
# we only support depth of 1
return getattr(event, op.BrowsePath[0].Name)
else:
# TODO: write code for index range.... but doe it make any sense
return self._aspace.get_attribute_value(event.EventType, op.AttributeId).Value.Value
elif type(op) is ua.LiteralOperand:
return op.Value.Value
else:
self.logger.warning("Where clause element % is not of a known type", op)
raise NotImplementedError