/ src / bitmessagekivy / baseclass / network.py
network.py
 1  # pylint: disable=unused-argument, consider-using-f-string
 2  # pylint: disable=no-name-in-module, too-few-public-methods
 3  
 4  """
 5  Network status
 6  """
 7  
 8  import os
 9  
10  from kivy.clock import Clock
11  from kivy.properties import StringProperty
12  from kivy.uix.screenmanager import Screen
13  
14  from pybitmessage import state
15  
16  if os.environ.get('INSTALL_TESTS', False) and not state.backend_py3_compatible:
17      from pybitmessage.mockbm import kivy_main
18      stats = kivy_main.network.stats
19      object_tracker = kivy_main.network.objectracker
20  else:
21      from pybitmessage.network import stats, objectracker as object_tracker
22  
23  
24  class NetworkStat(Screen):
25      """NetworkStat class for Kivy UI"""
26  
27      text_variable_1 = StringProperty(f'Total Connections::0')  # noqa: E999
28      text_variable_2 = StringProperty(f'Processed 0 peer-to-peer messages')
29      text_variable_3 = StringProperty(f'Processed 0 broadcast messages')
30      text_variable_4 = StringProperty(f'Processed 0 public keys')
31      text_variable_5 = StringProperty(f'Processed 0 objects to be synced')
32  
33      def __init__(self, **kwargs):
34          super().__init__(**kwargs)  # pylint: disable=missing-super-argument
35          Clock.schedule_interval(self.update_stats, 1)
36  
37      def update_stats(self, dt):
38          """Update network statistics"""
39          self.text_variable_1 = f'Total Connections::{len(stats.connectedHostsList())}'
40          self.text_variable_2 = f'Processed {state.numberOfMessagesProcessed} peer-to-peer messages'
41          self.text_variable_3 = f'Processed {state.numberOfBroadcastsProcessed} broadcast messages'
42          self.text_variable_4 = f'Processed {state.numberOfPubkeysProcessed} public keys'
43          self.text_variable_5 = f'Processed {object_tracker.missingObjects} objects'