/ test / functional / rpc_setban.py
rpc_setban.py
 1  #!/usr/bin/env python3
 2  # Copyright (c) 2015-2021 The Bitcoin Core developers
 3  # Distributed under the MIT software license, see the accompanying
 4  # file COPYING or http://www.opensource.org/licenses/mit-license.php.
 5  """Test the setban rpc call."""
 6  
 7  from contextlib import ExitStack
 8  from test_framework.test_framework import BitcoinTestFramework
 9  from test_framework.util import (
10      p2p_port,
11      assert_equal,
12  )
13  
14  class SetBanTests(BitcoinTestFramework):
15      def set_test_params(self):
16          self.num_nodes = 2
17          self.setup_clean_chain = True
18          self.extra_args = [[],[]]
19  
20      def is_banned(self, node, addr):
21          return any(e['address'] == addr for e in node.listbanned())
22  
23      def run_test(self):
24          # Node 0 connects to Node 1, check that the noban permission is not granted
25          self.connect_nodes(0, 1)
26          peerinfo = self.nodes[1].getpeerinfo()[0]
27          assert not "noban" in peerinfo["permissions"]
28  
29          # Node 0 get banned by Node 1
30          self.nodes[1].setban("127.0.0.1", "add")
31  
32          # Node 0 should not be able to reconnect
33          context = ExitStack()
34          context.enter_context(self.nodes[1].assert_debug_log(expected_msgs=['dropped (banned)\n'], timeout=50))
35          # When disconnected right after connecting, a v2 node will attempt to reconnect with v1.
36          # Wait for that to happen so that it cannot mess with later tests.
37          if self.options.v2transport:
38              context.enter_context(self.nodes[0].assert_debug_log(expected_msgs=['trying v1 connection'], timeout=50))
39          with context:
40              self.restart_node(1, [])
41              self.nodes[0].addnode("127.0.0.1:" + str(p2p_port(1)), "onetry")
42  
43          # However, node 0 should be able to reconnect if it has noban permission
44          self.restart_node(1, ['-whitelist=127.0.0.1'])
45          self.connect_nodes(0, 1)
46          peerinfo = self.nodes[1].getpeerinfo()[0]
47          assert "noban" in peerinfo["permissions"]
48  
49          # If we remove the ban, Node 0 should be able to reconnect even without noban permission
50          self.nodes[1].setban("127.0.0.1", "remove")
51          self.restart_node(1, [])
52          self.connect_nodes(0, 1)
53          peerinfo = self.nodes[1].getpeerinfo()[0]
54          assert not "noban" in peerinfo["permissions"]
55  
56          self.log.info("Test that a non-IP address can be banned/unbanned")
57          node = self.nodes[1]
58          tor_addr = "pg6mmjiyjmcrsslvykfwnntlaru7p5svn6y2ymmju6nubxndf4pscryd.onion"
59          ip_addr = "1.2.3.4"
60          assert not self.is_banned(node, tor_addr)
61          assert not self.is_banned(node, ip_addr)
62  
63          node.setban(tor_addr, "add")
64          assert self.is_banned(node, tor_addr)
65          assert not self.is_banned(node, ip_addr)
66  
67          self.log.info("Test the ban list is preserved through restart")
68  
69          self.restart_node(1)
70          assert self.is_banned(node, tor_addr)
71          assert not self.is_banned(node, ip_addr)
72  
73          node.setban(tor_addr, "remove")
74          assert not self.is_banned(self.nodes[1], tor_addr)
75          assert not self.is_banned(node, ip_addr)
76  
77          self.restart_node(1)
78          assert not self.is_banned(node, tor_addr)
79          assert not self.is_banned(node, ip_addr)
80  
81          self.log.info("Test -bantime")
82          self.restart_node(1, ["-bantime=1234"])
83          self.nodes[1].setban("127.0.0.1", "add")
84          banned = self.nodes[1].listbanned()[0]
85          assert_equal(banned['ban_duration'], 1234)
86  
87  if __name__ == '__main__':
88      SetBanTests().main()