Source code for opcua.common.utils

"""
Helper function and classes that do not rely on opcua library.
Helper function and classes depending on ua object are in ua_utils.py
"""

import logging
import os
from concurrent.futures import Future
import functools
import threading
from socket import error as SocketError

try:
    import asyncio
except ImportError:
    import trollius as asyncio


from opcua.ua.uaerrors import UaError


[docs]class ServiceError(UaError): def __init__(self, code): super(ServiceError, self).__init__('UA Service Error') self.code = code
[docs]class NotEnoughData(UaError): pass
[docs]class SocketClosedException(UaError): pass
[docs]class Buffer(object): """ alternative to io.BytesIO making debug easier and added a few conveniance methods """ def __init__(self, data, start_pos=0, size=-1): # self.logger = logging.getLogger(__name__) self._data = data self._cur_pos = start_pos if size == -1: size = len(data) - start_pos self._size = size def __str__(self): return "Buffer(size:{0}, data:{1})".format( self._size, self._data[self._cur_pos:self._cur_pos + self._size]) __repr__ = __str__ def __len__(self): return self._size
[docs] def read(self, size): """ read and pop number of bytes for buffer """ if size > self._size: raise NotEnoughData("Not enough data left in buffer, request for {0}, we have {1}".format(size, self)) # self.logger.debug("Request for %s bytes, from %s", size, self) self._size -= size pos = self._cur_pos self._cur_pos += size data = self._data[pos:self._cur_pos] # self.logger.debug("Returning: %s ", data) return data
[docs] def copy(self, size=-1): """ return a shadow copy, optionnaly only copy 'size' bytes """ if size == -1 or size > self._size: size = self._size return Buffer(self._data, self._cur_pos, size)
[docs] def skip(self, size): """ skip size bytes in buffer """ if size > self._size: raise NotEnoughData("Not enough data left in buffer, request for {0}, we have {1}".format(size, self)) self._size -= size self._cur_pos += size
[docs]class SocketWrapper(object): """ wrapper to make it possible to have same api for normal sockets, socket from asyncio, StringIO, etc.... """ def __init__(self, sock): self.socket = sock
[docs] def read(self, size): """ Receive up to size bytes from socket """ data = b'' while size > 0: try: chunk = self.socket.recv(size) except (OSError, SocketError) as ex: raise SocketClosedException("Server socket has closed", ex) if not chunk: raise SocketClosedException("Server socket has closed") data += chunk size -= len(chunk) return data
[docs] def write(self, data): self.socket.sendall(data)
[docs]def create_nonce(size=32): return os.urandom(size)
[docs]class ThreadLoop(threading.Thread): """ run an asyncio loop in a thread """ def __init__(self): threading.Thread.__init__(self) self.logger = logging.getLogger(__name__) self.loop = None self._cond = threading.Condition()
[docs] def start(self): with self._cond: threading.Thread.start(self) self._cond.wait()
[docs] def run(self): self.logger.debug("Starting subscription thread") self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) with self._cond: self._cond.notify_all() self.loop.run_forever() self.logger.debug("subscription thread ended")
[docs] def create_server(self, proto, hostname, port): return self.loop.create_server(proto, hostname, port)
[docs] def stop(self): """ stop subscription loop, thus the subscription thread """ self.loop.call_soon_threadsafe(self.loop.stop)
[docs] def close(self): self.loop.close() self.loop = None
[docs] def call_soon(self, callback): self.loop.call_soon_threadsafe(callback)
[docs] def call_later(self, delay, callback): """ threadsafe call_later from asyncio """ p = functools.partial(self.loop.call_later, delay, callback) self.loop.call_soon_threadsafe(p)
def _create_task(self, future, coro, cb=None): #task = self.loop.create_task(coro) task = asyncio.ensure_future(coro, loop=self.loop) if cb: task.add_done_callback(cb) future.set_result(task)
[docs] def create_task(self, coro, cb=None): """ threadsafe create_task from asyncio """ future = Future() p = functools.partial(self._create_task, future, coro, cb) self.loop.call_soon_threadsafe(p) return future.result()
[docs] def run_coro_and_wait(self, coro): cond = threading.Condition() def cb(_): with cond: cond.notify_all() with cond: task = self.create_task(coro, cb) cond.wait() return task.result()
def _run_until_complete(self, future, coro): task = self.loop.run_until_complete(coro) future.set_result(task)
[docs] def run_until_complete(self, coro): """ threadsafe run_until_completed from asyncio """ future = Future() p = functools.partial(self._run_until_complete, future, coro) self.loop.call_soon_threadsafe(p) return future.result()