network.py
1 # pylint: disable=unused-argument, consider-using-f-string 2 # pylint: disable=no-name-in-module, too-few-public-methods 3 4 """ 5 Network status 6 """ 7 8 import os 9 10 from kivy.clock import Clock 11 from kivy.properties import StringProperty 12 from kivy.uix.screenmanager import Screen 13 14 from pybitmessage import state 15 16 if os.environ.get('INSTALL_TESTS', False) and not state.backend_py3_compatible: 17 from pybitmessage.mockbm import kivy_main 18 stats = kivy_main.network.stats 19 object_tracker = kivy_main.network.objectracker 20 else: 21 from pybitmessage.network import stats, objectracker as object_tracker 22 23 24 class NetworkStat(Screen): 25 """NetworkStat class for Kivy UI""" 26 27 text_variable_1 = StringProperty(f'Total Connections::0') # noqa: E999 28 text_variable_2 = StringProperty(f'Processed 0 peer-to-peer messages') 29 text_variable_3 = StringProperty(f'Processed 0 broadcast messages') 30 text_variable_4 = StringProperty(f'Processed 0 public keys') 31 text_variable_5 = StringProperty(f'Processed 0 objects to be synced') 32 33 def __init__(self, **kwargs): 34 super().__init__(**kwargs) # pylint: disable=missing-super-argument 35 Clock.schedule_interval(self.update_stats, 1) 36 37 def update_stats(self, dt): 38 """Update network statistics""" 39 self.text_variable_1 = f'Total Connections::{len(stats.connectedHostsList())}' 40 self.text_variable_2 = f'Processed {state.numberOfMessagesProcessed} peer-to-peer messages' 41 self.text_variable_3 = f'Processed {state.numberOfBroadcastsProcessed} broadcast messages' 42 self.text_variable_4 = f'Processed {state.numberOfPubkeysProcessed} public keys' 43 self.text_variable_5 = f'Processed {object_tracker.missingObjects} objects'