/ client_code / messaging.py
messaging.py
1 # SPDX-License-Identifier: MIT 2 # 3 # Copyright (c) 2021 The Anvil Extras project team members listed at 4 # https://github.com/anvilistas/anvil-extras/graphs/contributors 5 # 6 # This software is published at https://github.com/anvilistas/anvil-extras 7 8 from .logging import INFO 9 from .logging import Logger as _Logger 10 from .utils._warnings import warn as _warn 11 12 __version__ = "3.1.0" 13 14 15 _null_logger = _Logger() 16 _null_logger.disabled = True 17 18 19 class Message: 20 def __init__(self, title, content=None): 21 self.title = title 22 self.content = content 23 24 25 class Subscriber: 26 def __init__(self, subscriber, handler): 27 self.subscriber = subscriber 28 self.handler = handler 29 30 31 class Publisher: 32 default_log_level = INFO 33 34 def __init__(self, *, logger: _Logger = None, **kwargs): 35 self.logger = logger or _null_logger 36 self.subscribers = {} 37 self._deprecation_warnings(**kwargs) 38 39 def _deprecation_warnings(self, **kwargs): 40 if "with_logging" in kwargs: 41 _warn( 42 "publisher.with_logging", 43 "with_logging option is deprecated and it will be removed in future versions. Use the logger options instead, passing an instance of logging.Logger", 44 "DEPRECATION WARNING", 45 ) 46 47 def publish(self, channel, title, content=None, **kwargs): 48 self._deprecation_warnings(**kwargs) 49 message = Message(title, content) 50 subscribers = self.subscribers.get(channel, []) 51 for subscriber in subscribers: 52 subscriber.handler(message) 53 self.logger.log( 54 self.default_log_level, 55 f"Published '{message.title}' message on '{channel}' channel to " 56 f"{len(subscribers)} subscriber(s)", 57 ) 58 59 def subscribe(self, channel, subscriber, handler, **kwargs): 60 self._deprecation_warnings(**kwargs) 61 if channel not in self.subscribers: 62 self.subscribers[channel] = [] 63 self.subscribers[channel].append(Subscriber(subscriber, handler)) 64 self.logger.log( 65 self.default_log_level, f"Added subscriber to {channel} channel" 66 ) 67 68 def unsubscribe(self, channel, subscriber, **kwargs): 69 self._deprecation_warnings(**kwargs) 70 if channel in self.subscribers: 71 self.subscribers[channel] = [ 72 s for s in self.subscribers[channel] if s.subscriber != subscriber 73 ] 74 self.logger.log( 75 self.default_log_level, f"Removed subscriber from {channel} channel" 76 ) 77 78 def close_channel(self, channel, **kwargs): 79 self._deprecation_warnings(**kwargs) 80 subscribers_count = len(self.subscribers[channel]) 81 del self.subscribers[channel] 82 self.logger.log( 83 self.default_log_level, 84 f"{channel} closed ({subscribers_count} subscribers)", 85 )