"""
server side implementation of subscription service
"""
from threading import RLock
import logging
from opcua import ua
from opcua.common import utils
from opcua.server.internal_subscription import InternalSubscription
[docs]class SubscriptionService(object):
def __init__(self, aspace):
self.logger = logging.getLogger(__name__)
self.loop = None
self.aspace = aspace
self.subscriptions = {}
self._sub_id_counter = 77
self._lock = RLock()
[docs] def set_loop(self, loop):
self.loop = loop
[docs] def create_subscription(self, params, callback):
self.logger.info("create subscription with callback: %s", callback)
result = ua.CreateSubscriptionResult()
result.RevisedPublishingInterval = params.RequestedPublishingInterval
result.RevisedLifetimeCount = params.RequestedLifetimeCount
result.RevisedMaxKeepAliveCount = params.RequestedMaxKeepAliveCount
with self._lock:
self._sub_id_counter += 1
result.SubscriptionId = self._sub_id_counter
sub = InternalSubscription(self, result, self.aspace, callback)
sub.start()
self.subscriptions[result.SubscriptionId] = sub
return result
[docs] def modify_subscription(self, params, callback):
# Requested params are ignored, result = params set during create_subscription.
self.logger.info("modify subscription with callback: %s", callback)
result = ua.ModifySubscriptionResult()
try:
with self._lock:
sub = self.subscriptions[params.SubscriptionId]
result.RevisedPublishingInterval = sub.data.RevisedPublishingInterval
result.RevisedLifetimeCount = sub.data.RevisedLifetimeCount
result.RevisedMaxKeepAliveCount = sub.data.RevisedMaxKeepAliveCount
return result
except KeyError:
raise utils.ServiceError(ua.StatusCodes.BadSubscriptionIdInvalid)
[docs] def delete_subscriptions(self, ids):
self.logger.info("delete subscriptions: %s", ids)
res = []
for i in ids:
with self._lock:
if i not in self.subscriptions:
res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
else:
sub = self.subscriptions.pop(i)
sub.stop()
res.append(ua.StatusCode())
return res
[docs] def publish(self, acks):
self.logger.info("publish request with acks %s", acks)
with self._lock:
for subid, sub in self.subscriptions.items():
sub.publish([ack.SequenceNumber for ack in acks if ack.SubscriptionId == subid])
[docs] def create_monitored_items(self, params):
self.logger.info("create monitored items")
with self._lock:
if params.SubscriptionId not in self.subscriptions:
res = []
for _ in params.ItemsToCreate:
response = ua.MonitoredItemCreateResult()
response.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
res.append(response)
return res
return self.subscriptions[params.SubscriptionId].monitored_item_srv.create_monitored_items(params)
[docs] def modify_monitored_items(self, params):
self.logger.info("modify monitored items")
with self._lock:
if params.SubscriptionId not in self.subscriptions:
res = []
for _ in params.ItemsToModify:
result = ua.MonitoredItemModifyResult()
result.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
res.append(result)
return res
return self.subscriptions[params.SubscriptionId].monitored_item_srv.modify_monitored_items(params)
[docs] def delete_monitored_items(self, params):
self.logger.info("delete monitored items")
with self._lock:
if params.SubscriptionId not in self.subscriptions:
res = []
for _ in params.MonitoredItemIds:
res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
return res
return self.subscriptions[params.SubscriptionId].monitored_item_srv.delete_monitored_items(
params.MonitoredItemIds)
[docs] def republish(self, params):
with self._lock:
if params.SubscriptionId not in self.subscriptions:
# TODO: what should I do?
return ua.NotificationMessage()
return self.subscriptions[params.SubscriptionId].republish(params.RetransmitSequenceNumber)
[docs] def trigger_event(self, event):
with self._lock:
for sub in self.subscriptions.values():
sub.monitored_item_srv.trigger_event(event)