Source code for opcua.server.subscription_service

"""
server side implementation of subscription service
"""

from threading import RLock
import logging

from opcua import ua
from opcua.common import utils
from opcua.server.internal_subscription import InternalSubscription


[docs]class SubscriptionService(object): def __init__(self, aspace): self.logger = logging.getLogger(__name__) self.loop = None self.aspace = aspace self.subscriptions = {} self._sub_id_counter = 77 self._lock = RLock()
[docs] def set_loop(self, loop): self.loop = loop
[docs] def create_subscription(self, params, callback): self.logger.info("create subscription with callback: %s", callback) result = ua.CreateSubscriptionResult() result.RevisedPublishingInterval = params.RequestedPublishingInterval result.RevisedLifetimeCount = params.RequestedLifetimeCount result.RevisedMaxKeepAliveCount = params.RequestedMaxKeepAliveCount with self._lock: self._sub_id_counter += 1 result.SubscriptionId = self._sub_id_counter sub = InternalSubscription(self, result, self.aspace, callback) sub.start() self.subscriptions[result.SubscriptionId] = sub return result
[docs] def modify_subscription(self, params, callback): # Requested params are ignored, result = params set during create_subscription. self.logger.info("modify subscription with callback: %s", callback) result = ua.ModifySubscriptionResult() try: with self._lock: sub = self.subscriptions[params.SubscriptionId] result.RevisedPublishingInterval = sub.data.RevisedPublishingInterval result.RevisedLifetimeCount = sub.data.RevisedLifetimeCount result.RevisedMaxKeepAliveCount = sub.data.RevisedMaxKeepAliveCount return result except KeyError: raise utils.ServiceError(ua.StatusCodes.BadSubscriptionIdInvalid)
[docs] def delete_subscriptions(self, ids): self.logger.info("delete subscriptions: %s", ids) res = [] for i in ids: with self._lock: if i not in self.subscriptions: res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)) else: sub = self.subscriptions.pop(i) sub.stop() res.append(ua.StatusCode()) return res
[docs] def publish(self, acks): self.logger.info("publish request with acks %s", acks) with self._lock: for subid, sub in self.subscriptions.items(): sub.publish([ack.SequenceNumber for ack in acks if ack.SubscriptionId == subid])
[docs] def create_monitored_items(self, params): self.logger.info("create monitored items") with self._lock: if params.SubscriptionId not in self.subscriptions: res = [] for _ in params.ItemsToCreate: response = ua.MonitoredItemCreateResult() response.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid) res.append(response) return res return self.subscriptions[params.SubscriptionId].monitored_item_srv.create_monitored_items(params)
[docs] def modify_monitored_items(self, params): self.logger.info("modify monitored items") with self._lock: if params.SubscriptionId not in self.subscriptions: res = [] for _ in params.ItemsToModify: result = ua.MonitoredItemModifyResult() result.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid) res.append(result) return res return self.subscriptions[params.SubscriptionId].monitored_item_srv.modify_monitored_items(params)
[docs] def delete_monitored_items(self, params): self.logger.info("delete monitored items") with self._lock: if params.SubscriptionId not in self.subscriptions: res = [] for _ in params.MonitoredItemIds: res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)) return res return self.subscriptions[params.SubscriptionId].monitored_item_srv.delete_monitored_items( params.MonitoredItemIds)
[docs] def republish(self, params): with self._lock: if params.SubscriptionId not in self.subscriptions: # TODO: what should I do? return ua.NotificationMessage() return self.subscriptions[params.SubscriptionId].republish(params.RetransmitSequenceNumber)
[docs] def trigger_event(self, event): with self._lock: for sub in self.subscriptions.values(): sub.monitored_item_srv.trigger_event(event)