interface_rest.py
1 #!/usr/bin/env python3 2 # Copyright (c) 2014-present The Bitcoin Core developers 3 # Distributed under the MIT software license, see the accompanying 4 # file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 """Test the REST API.""" 6 7 from decimal import Decimal 8 from enum import Enum 9 from io import BytesIO 10 import http.client 11 import json 12 import typing 13 import urllib.parse 14 15 16 from test_framework.messages import ( 17 BLOCK_HEADER_SIZE, 18 COIN, 19 deser_block_spent_outputs, 20 ) 21 from test_framework.test_framework import BitcoinTestFramework 22 from test_framework.util import ( 23 assert_equal, 24 assert_greater_than, 25 assert_greater_than_or_equal, 26 ) 27 from test_framework.wallet import ( 28 MiniWallet, 29 getnewdestination, 30 ) 31 32 33 INVALID_PARAM = "abc" 34 UNKNOWN_PARAM = "0000000000000000000000000000000000000000000000000000000000000000" 35 36 37 class ReqType(Enum): 38 JSON = 1 39 BIN = 2 40 HEX = 3 41 42 class RetType(Enum): 43 OBJ = 1 44 BYTES = 2 45 JSON = 3 46 47 def filter_output_indices_by_value(vouts, value): 48 for vout in vouts: 49 if vout['value'] == value: 50 yield vout['n'] 51 52 class RESTTest (BitcoinTestFramework): 53 def set_test_params(self): 54 self.num_nodes = 2 55 self.extra_args = [["-rest", "-blockfilterindex=1"], []] 56 # whitelist peers to speed up tx relay / mempool sync 57 self.noban_tx_relay = True 58 self.supports_cli = False 59 60 def test_rest_request( 61 self, 62 uri: str, 63 http_method: str = 'GET', 64 req_type: ReqType = ReqType.JSON, 65 body: str = '', 66 status: int = 200, 67 ret_type: RetType = RetType.JSON, 68 query_params: typing.Union[dict[str, typing.Any], str, None] = None, 69 ) -> typing.Union[http.client.HTTPResponse, bytes, str, None]: 70 rest_uri = '/rest' + uri 71 if req_type in ReqType: 72 rest_uri += f'.{req_type.name.lower()}' 73 if query_params: 74 if isinstance(query_params, str): 75 rest_uri += f'?{query_params}' 76 else: 77 rest_uri += f'?{urllib.parse.urlencode(query_params)}' 78 79 conn = http.client.HTTPConnection(self.url.hostname, self.url.port) 80 self.log.debug(f'{http_method} {rest_uri} {body}') 81 if http_method == 'GET': 82 conn.request('GET', rest_uri) 83 elif http_method == 'POST': 84 conn.request('POST', rest_uri, body) 85 resp = conn.getresponse() 86 87 assert resp.status == status, f"Expected: {status}, Got: {resp.status} ({resp.reason}) - Response: {str(resp.read())}" 88 89 if ret_type == RetType.OBJ: 90 return resp 91 elif ret_type == RetType.BYTES: 92 return resp.read() 93 elif ret_type == RetType.JSON: 94 return json.loads(resp.read().decode('utf-8'), parse_float=Decimal) 95 96 return None 97 98 def run_test(self): 99 self.url = urllib.parse.urlparse(self.nodes[0].url) 100 self.wallet = MiniWallet(self.nodes[0]) 101 102 self.log.info("Broadcast test transaction and sync nodes") 103 txid = self.wallet.send_to(from_node=self.nodes[0], scriptPubKey=getnewdestination()[1], amount=int(0.1 * COIN))["txid"] 104 self.sync_all() 105 106 self.log.info("Test the /tx URI") 107 108 json_obj = self.test_rest_request(f"/tx/{txid}") 109 assert_equal(json_obj['txid'], txid) 110 111 # Check hex format response 112 hex_response = self.test_rest_request(f"/tx/{txid}", req_type=ReqType.HEX, ret_type=RetType.OBJ) 113 assert_greater_than_or_equal(int(hex_response.getheader('content-length')), 114 json_obj['size']*2) 115 116 spent = (json_obj['vin'][0]['txid'], json_obj['vin'][0]['vout']) # get the vin to later check for utxo (should be spent by then) 117 # get n of 0.1 outpoint 118 n, = filter_output_indices_by_value(json_obj['vout'], Decimal('0.1')) 119 spending = (txid, n) 120 121 # Test /tx with an invalid and an unknown txid 122 resp = self.test_rest_request(uri=f"/tx/{INVALID_PARAM}", ret_type=RetType.OBJ, status=400) 123 assert_equal(resp.read().decode('utf-8').rstrip(), f"Invalid hash: {INVALID_PARAM}") 124 resp = self.test_rest_request(uri=f"/tx/{UNKNOWN_PARAM}", ret_type=RetType.OBJ, status=404) 125 assert_equal(resp.read().decode('utf-8').rstrip(), f"{UNKNOWN_PARAM} not found") 126 127 self.log.info("Query an unspent TXO using the /getutxos URI") 128 129 self.generate(self.wallet, 1) 130 bb_hash = self.nodes[0].getbestblockhash() 131 132 # Check chainTip response 133 json_obj = self.test_rest_request(f"/getutxos/{spending[0]}-{spending[1]}") 134 assert_equal(json_obj['chaintipHash'], bb_hash) 135 136 # Make sure there is one utxo 137 assert_equal(len(json_obj['utxos']), 1) 138 assert_equal(json_obj['utxos'][0]['value'], Decimal('0.1')) 139 140 self.log.info("Query a spent TXO using the /getutxos URI") 141 142 json_obj = self.test_rest_request(f"/getutxos/{spent[0]}-{spent[1]}") 143 144 # Check chainTip response 145 assert_equal(json_obj['chaintipHash'], bb_hash) 146 147 # Make sure there is no utxo in the response because this outpoint has been spent 148 assert_equal(len(json_obj['utxos']), 0) 149 150 # Check bitmap 151 assert_equal(json_obj['bitmap'], "0") 152 153 self.log.info("Query two TXOs using the /getutxos URI") 154 155 json_obj = self.test_rest_request(f"/getutxos/{spending[0]}-{spending[1]}/{spent[0]}-{spent[1]}") 156 157 assert_equal(len(json_obj['utxos']), 1) 158 assert_equal(json_obj['bitmap'], "10") 159 160 self.log.info("Query the TXOs using the /getutxos URI with a binary response") 161 162 bin_request = b'\x01\x02' 163 for txid, n in [spending, spent]: 164 bin_request += bytes.fromhex(txid) 165 bin_request += n.to_bytes(4, 'little') 166 167 bin_response = self.test_rest_request("/getutxos", http_method='POST', req_type=ReqType.BIN, body=bin_request, ret_type=RetType.BYTES) 168 chain_height = int.from_bytes(bin_response[0:4], 'little') 169 response_hash = bin_response[4:36][::-1].hex() 170 171 assert_equal(bb_hash, response_hash) # check if getutxo's chaintip during calculation was fine 172 assert_equal(chain_height, 201) # chain height must be 201 (pre-mined chain [200] + generated block [1]) 173 174 self.log.info("Test the /getutxos URI with and without /checkmempool") 175 # Create a transaction, check that it's found with /checkmempool, but 176 # not found without. Then confirm the transaction and check that it's 177 # found with or without /checkmempool. 178 179 # do a tx and don't sync 180 txid = self.wallet.send_to(from_node=self.nodes[0], scriptPubKey=getnewdestination()[1], amount=int(0.1 * COIN))["txid"] 181 json_obj = self.test_rest_request(f"/tx/{txid}") 182 # get the spent output to later check for utxo (should be spent by then) 183 spent = (json_obj['vin'][0]['txid'], json_obj['vin'][0]['vout']) 184 # get n of 0.1 outpoint 185 n, = filter_output_indices_by_value(json_obj['vout'], Decimal('0.1')) 186 spending = (txid, n) 187 188 json_obj = self.test_rest_request(f"/getutxos/{spending[0]}-{spending[1]}") 189 assert_equal(len(json_obj['utxos']), 0) 190 191 json_obj = self.test_rest_request(f"/getutxos/checkmempool/{spending[0]}-{spending[1]}") 192 assert_equal(len(json_obj['utxos']), 1) 193 194 json_obj = self.test_rest_request(f"/getutxos/{spent[0]}-{spent[1]}") 195 assert_equal(len(json_obj['utxos']), 1) 196 197 json_obj = self.test_rest_request(f"/getutxos/checkmempool/{spent[0]}-{spent[1]}") 198 assert_equal(len(json_obj['utxos']), 0) 199 200 self.generate(self.nodes[0], 1) 201 202 json_obj = self.test_rest_request(f"/getutxos/{spending[0]}-{spending[1]}") 203 assert_equal(len(json_obj['utxos']), 1) 204 205 json_obj = self.test_rest_request(f"/getutxos/checkmempool/{spending[0]}-{spending[1]}") 206 assert_equal(len(json_obj['utxos']), 1) 207 208 self.log.info("Check some invalid requests") 209 self.test_rest_request("/getutxos", http_method='POST', req_type=ReqType.JSON, body='{"checkmempool', status=400, ret_type=RetType.OBJ) 210 self.test_rest_request("/getutxos", http_method='POST', req_type=ReqType.BIN, body='{"checkmempool', status=400, ret_type=RetType.OBJ) 211 self.test_rest_request("/getutxos/checkmempool", http_method='POST', req_type=ReqType.JSON, status=400, ret_type=RetType.OBJ) 212 self.test_rest_request(f"/getutxos/{spending[0]}_+1", ret_type=RetType.OBJ, status=400) 213 self.test_rest_request(f"/getutxos/{spending[0]}-+1", ret_type=RetType.OBJ, status=400) 214 self.test_rest_request(f"/getutxos/{spending[0]}--1", ret_type=RetType.OBJ, status=400) 215 self.test_rest_request(f"/getutxos/{spending[0]}aa-1234", ret_type=RetType.OBJ, status=400) 216 self.test_rest_request("/getutxos/aa-1234", ret_type=RetType.OBJ, status=400) 217 218 # Test limits 219 long_uri = '/'.join([f"{txid}-{n_}" for n_ in range(20)]) 220 self.test_rest_request(f"/getutxos/checkmempool/{long_uri}", http_method='POST', status=400, ret_type=RetType.OBJ) 221 222 long_uri = '/'.join([f'{txid}-{n_}' for n_ in range(15)]) 223 self.test_rest_request(f"/getutxos/checkmempool/{long_uri}", http_method='POST', status=200) 224 225 self.generate(self.nodes[0], 1) # generate block to not affect upcoming tests 226 227 self.log.info("Test the /block, /blockhashbyheight, /headers, and /blockfilterheaders URIs") 228 bb_hash = self.nodes[0].getbestblockhash() 229 230 # Check result if block does not exists 231 assert_equal(self.test_rest_request(f"/headers/{UNKNOWN_PARAM}", query_params={"count": 1}), []) 232 self.test_rest_request(f"/block/{UNKNOWN_PARAM}", status=404, ret_type=RetType.OBJ) 233 234 # Check result if block is not in the active chain 235 self.nodes[0].invalidateblock(bb_hash) 236 assert_equal(self.test_rest_request(f'/headers/{bb_hash}', query_params={'count': 1}), []) 237 self.test_rest_request(f'/block/{bb_hash}') 238 self.nodes[0].reconsiderblock(bb_hash) 239 240 # Check binary format 241 response = self.test_rest_request(f"/block/{bb_hash}", req_type=ReqType.BIN, ret_type=RetType.OBJ) 242 assert_greater_than(int(response.getheader('content-length')), BLOCK_HEADER_SIZE) 243 response_bytes = response.read() 244 245 # Compare with block header 246 response_header = self.test_rest_request(f"/headers/{bb_hash}", req_type=ReqType.BIN, ret_type=RetType.OBJ, query_params={"count": 1}) 247 assert_equal(int(response_header.getheader('content-length')), BLOCK_HEADER_SIZE) 248 response_header_bytes = response_header.read() 249 assert_equal(response_bytes[:BLOCK_HEADER_SIZE], response_header_bytes) 250 251 # Check block hex format 252 response_hex = self.test_rest_request(f"/block/{bb_hash}", req_type=ReqType.HEX, ret_type=RetType.OBJ) 253 assert_greater_than(int(response_hex.getheader('content-length')), BLOCK_HEADER_SIZE*2) 254 response_hex_bytes = response_hex.read().strip(b'\n') 255 assert_equal(response_bytes.hex().encode(), response_hex_bytes) 256 257 # Compare with hex block header 258 response_header_hex = self.test_rest_request(f"/headers/{bb_hash}", req_type=ReqType.HEX, ret_type=RetType.OBJ, query_params={"count": 1}) 259 assert_greater_than(int(response_header_hex.getheader('content-length')), BLOCK_HEADER_SIZE*2) 260 response_header_hex_bytes = response_header_hex.read(BLOCK_HEADER_SIZE*2) 261 assert_equal(response_bytes[:BLOCK_HEADER_SIZE].hex().encode(), response_header_hex_bytes) 262 263 # Check json format 264 block_json_obj = self.test_rest_request(f"/block/{bb_hash}") 265 assert_equal(block_json_obj['hash'], bb_hash) 266 assert_equal(self.test_rest_request(f"/blockhashbyheight/{block_json_obj['height']}")['blockhash'], bb_hash) 267 268 # Check hex/bin format 269 resp_hex = self.test_rest_request(f"/blockhashbyheight/{block_json_obj['height']}", req_type=ReqType.HEX, ret_type=RetType.OBJ) 270 assert_equal(resp_hex.read().decode('utf-8').rstrip(), bb_hash) 271 resp_bytes = self.test_rest_request(f"/blockhashbyheight/{block_json_obj['height']}", req_type=ReqType.BIN, ret_type=RetType.BYTES) 272 blockhash = resp_bytes[::-1].hex() 273 assert_equal(blockhash, bb_hash) 274 275 # Check invalid blockhashbyheight requests 276 resp = self.test_rest_request(f"/blockhashbyheight/{INVALID_PARAM}", ret_type=RetType.OBJ, status=400) 277 assert_equal(resp.read().decode('utf-8').rstrip(), f"Invalid height: {INVALID_PARAM}") 278 resp = self.test_rest_request("/blockhashbyheight/+1", ret_type=RetType.OBJ, status=400) 279 assert_equal(resp.read().decode('utf-8').rstrip(), "Invalid height: +1") 280 resp = self.test_rest_request("/blockhashbyheight/1000000", ret_type=RetType.OBJ, status=404) 281 assert_equal(resp.read().decode('utf-8').rstrip(), "Block height out of range") 282 resp = self.test_rest_request("/blockhashbyheight/-1", ret_type=RetType.OBJ, status=400) 283 assert_equal(resp.read().decode('utf-8').rstrip(), "Invalid height: -1") 284 self.test_rest_request("/blockhashbyheight/", ret_type=RetType.OBJ, status=400) 285 286 # Compare with json block header 287 json_obj = self.test_rest_request(f"/headers/{bb_hash}", query_params={"count": 1}) 288 assert_equal(len(json_obj), 1) # ensure that there is one header in the json response 289 assert_equal(json_obj[0]['hash'], bb_hash) # request/response hash should be the same 290 291 # Check invalid uri (% symbol at the end of the request) 292 for invalid_uri in [f"/headers/{bb_hash}%", f"/blockfilterheaders/basic/{bb_hash}%", "/mempool/contents.json?%"]: 293 resp = self.test_rest_request(invalid_uri, ret_type=RetType.OBJ, status=400) 294 assert_equal(resp.read().decode('utf-8').rstrip(), "URI parsing failed, it likely contained RFC 3986 invalid characters") 295 296 # Compare with normal RPC block response 297 rpc_block_json = self.nodes[0].getblock(bb_hash) 298 for key in ['hash', 'confirmations', 'height', 'version', 'merkleroot', 'time', 'nonce', 'bits', 'target', 'difficulty', 'chainwork', 'previousblockhash']: 299 assert_equal(json_obj[0][key], rpc_block_json[key]) 300 301 # See if we can get 5 headers in one response 302 self.generate(self.nodes[1], 5) 303 expected_filter = { 304 'basic block filter index': {'synced': True, 'best_block_height': 208}, 305 } 306 self.wait_until(lambda: self.nodes[0].getindexinfo() == expected_filter) 307 json_obj = self.test_rest_request(f"/headers/{bb_hash}", query_params={"count": 5}) 308 assert_equal(len(json_obj), 5) # now we should have 5 header objects 309 json_obj = self.test_rest_request(f"/blockfilterheaders/basic/{bb_hash}", query_params={"count": 5}) 310 first_filter_header = json_obj[0] 311 assert_equal(len(json_obj), 5) # now we should have 5 filter header objects 312 json_obj = self.test_rest_request(f"/blockfilter/basic/{bb_hash}") 313 314 # Compare with normal RPC blockfilter response 315 rpc_blockfilter = self.nodes[0].getblockfilter(bb_hash) 316 assert_equal(first_filter_header, rpc_blockfilter['header']) 317 assert_equal(json_obj['filter'], rpc_blockfilter['filter']) 318 319 # Test blockfilterheaders with an invalid hash and filtertype 320 resp = self.test_rest_request(f"/blockfilterheaders/{INVALID_PARAM}/{bb_hash}", ret_type=RetType.OBJ, status=400) 321 assert_equal(resp.read().decode('utf-8').rstrip(), f"Unknown filtertype {INVALID_PARAM}") 322 resp = self.test_rest_request(f"/blockfilterheaders/basic/{INVALID_PARAM}", ret_type=RetType.OBJ, status=400) 323 assert_equal(resp.read().decode('utf-8').rstrip(), f"Invalid hash: {INVALID_PARAM}") 324 325 # Test number parsing 326 for num in ['5a', '-5', '0', '2001', '99999999999999999999999999999999999']: 327 assert_equal( 328 bytes(f'Header count is invalid or out of acceptable range (1-2000): {num}\r\n', 'ascii'), 329 self.test_rest_request(f"/headers/{bb_hash}", ret_type=RetType.BYTES, status=400, query_params={"count": num}), 330 ) 331 332 self.log.info("Test tx inclusion in the /mempool and /block URIs") 333 334 # Make 3 chained txs and mine them on node 1 335 txs = [] 336 input_txid = txid 337 for _ in range(3): 338 utxo_to_spend = self.wallet.get_utxo(txid=input_txid) 339 txs.append(self.wallet.send_self_transfer(from_node=self.nodes[0], utxo_to_spend=utxo_to_spend)['txid']) 340 input_txid = txs[-1] 341 self.sync_all() 342 343 # Check that there are exactly 3 transactions in the TX memory pool before generating the block 344 json_obj = self.test_rest_request("/mempool/info") 345 assert_equal(json_obj['size'], 3) 346 # the size of the memory pool should be greater than 3x ~100 bytes 347 assert_greater_than(json_obj['bytes'], 300) 348 349 mempool_info = self.nodes[0].getmempoolinfo() 350 # pop unstable unbroadcastcount before check 351 for obj in [json_obj, mempool_info]: 352 obj.pop("unbroadcastcount") 353 assert_equal(json_obj, mempool_info) 354 355 # Check that there are our submitted transactions in the TX memory pool 356 json_obj = self.test_rest_request("/mempool/contents") 357 raw_mempool_verbose = self.nodes[0].getrawmempool(verbose=True) 358 359 assert_equal(json_obj, raw_mempool_verbose) 360 361 for i, tx in enumerate(txs): 362 assert tx in json_obj 363 assert_equal(json_obj[tx]['spentby'], txs[i + 1:i + 2]) 364 assert_equal(json_obj[tx]['depends'], txs[i - 1:i]) 365 366 # Check the mempool response for explicit parameters 367 json_obj = self.test_rest_request("/mempool/contents", query_params={"verbose": "true", "mempool_sequence": "false"}) 368 assert_equal(json_obj, raw_mempool_verbose) 369 370 # Check the mempool response for not verbose 371 json_obj = self.test_rest_request("/mempool/contents", query_params={"verbose": "false"}) 372 raw_mempool = self.nodes[0].getrawmempool(verbose=False) 373 374 assert_equal(json_obj, raw_mempool) 375 376 # Check the mempool response for sequence 377 json_obj = self.test_rest_request("/mempool/contents", query_params={"verbose": "false", "mempool_sequence": "true"}) 378 raw_mempool = self.nodes[0].getrawmempool(verbose=False, mempool_sequence=True) 379 380 assert_equal(json_obj, raw_mempool) 381 382 # Check for error response if verbose=true and mempool_sequence=true 383 resp = self.test_rest_request("/mempool/contents", ret_type=RetType.OBJ, status=400, query_params={"verbose": "true", "mempool_sequence": "true"}) 384 assert_equal(resp.read().decode('utf-8').strip(), 'Verbose results cannot contain mempool sequence values. (hint: set "verbose=false")') 385 386 # Check for error response if verbose is not "true" or "false" 387 resp = self.test_rest_request("/mempool/contents", ret_type=RetType.OBJ, status=400, query_params={"verbose": "TRUE"}) 388 assert_equal(resp.read().decode('utf-8').strip(), 'The "verbose" query parameter must be either "true" or "false".') 389 390 # Check for error response if mempool_sequence is not "true" or "false" 391 resp = self.test_rest_request("/mempool/contents", ret_type=RetType.OBJ, status=400, query_params={"verbose": "false", "mempool_sequence": "TRUE"}) 392 assert_equal(resp.read().decode('utf-8').strip(), 'The "mempool_sequence" query parameter must be either "true" or "false".') 393 394 # Now mine the transactions 395 newblockhash = self.generate(self.nodes[1], 1) 396 397 # Check if the 3 tx show up in the new block 398 json_obj = self.test_rest_request(f"/block/{newblockhash[0]}") 399 non_coinbase_txs = {tx['txid'] for tx in json_obj['tx'] 400 if 'coinbase' not in tx['vin'][0]} 401 assert_equal(non_coinbase_txs, set(txs)) 402 403 # Verify that the non-coinbase tx has "prevout" key set 404 for tx_obj in json_obj["tx"]: 405 for vin in tx_obj["vin"]: 406 if "coinbase" not in vin: 407 assert "prevout" in vin 408 assert_equal(vin["prevout"]["generated"], False) 409 else: 410 assert "prevout" not in vin 411 412 # Check the same but without tx details 413 json_obj = self.test_rest_request(f"/block/notxdetails/{newblockhash[0]}") 414 for tx in txs: 415 assert tx in json_obj['tx'] 416 417 self.log.info("Test the /chaininfo URI") 418 419 bb_hash = self.nodes[0].getbestblockhash() 420 421 json_obj = self.test_rest_request("/chaininfo") 422 assert_equal(json_obj['bestblockhash'], bb_hash) 423 424 # Compare with normal RPC getblockchaininfo response 425 blockchain_info = self.nodes[0].getblockchaininfo() 426 assert_equal(blockchain_info, json_obj) 427 428 # Test compatibility of deprecated and newer endpoints 429 self.log.info("Test compatibility of deprecated and newer endpoints") 430 assert_equal(self.test_rest_request(f"/headers/{bb_hash}", query_params={"count": 1}), self.test_rest_request(f"/headers/1/{bb_hash}")) 431 assert_equal(self.test_rest_request(f"/blockfilterheaders/basic/{bb_hash}", query_params={"count": 1}), self.test_rest_request(f"/blockfilterheaders/basic/5/{bb_hash}")) 432 433 self.log.info("Test the /spenttxouts URI") 434 435 block_count = self.nodes[0].getblockcount() 436 for height in range(0, block_count + 1): 437 blockhash = self.nodes[0].getblockhash(height) 438 spent_bin = self.test_rest_request(f"/spenttxouts/{blockhash}", req_type=ReqType.BIN, ret_type=RetType.BYTES) 439 spent_hex = self.test_rest_request(f"/spenttxouts/{blockhash}", req_type=ReqType.HEX, ret_type=RetType.BYTES) 440 spent_json = self.test_rest_request(f"/spenttxouts/{blockhash}", req_type=ReqType.JSON, ret_type=RetType.JSON) 441 442 assert_equal(bytes.fromhex(spent_hex.decode()), spent_bin) 443 444 spent = deser_block_spent_outputs(BytesIO(spent_bin)) 445 block = self.nodes[0].getblock(blockhash, 3) # return prevout for each input 446 assert_equal(len(spent), len(block["tx"])) 447 assert_equal(len(spent_json), len(block["tx"])) 448 449 for i, tx in enumerate(block["tx"]): 450 prevouts = [txin["prevout"] for txin in tx["vin"] if "coinbase" not in txin] 451 # compare with `getblock` JSON output (coinbase tx has no prevouts) 452 actual = [(txout.scriptPubKey.hex(), Decimal(txout.nValue) / COIN) for txout in spent[i]] 453 expected = [(p["scriptPubKey"]["hex"], p["value"]) for p in prevouts] 454 assert_equal(expected, actual) 455 # also compare JSON format 456 actual = [(prevout["scriptPubKey"], prevout["value"]) for prevout in spent_json[i]] 457 expected = [(p["scriptPubKey"], p["value"]) for p in prevouts] 458 assert_equal(expected, actual) 459 460 self.log.info("Test the /blockpart URI") 461 462 blockhash = self.nodes[0].getbestblockhash() 463 block_bin = self.test_rest_request(f"/block/{blockhash}", req_type=ReqType.BIN, ret_type=RetType.BYTES) 464 for req_type in (ReqType.BIN, ReqType.HEX): 465 def get_block_part(status: int = 200, **kwargs): 466 resp = self.test_rest_request(f"/blockpart/{blockhash}", status=status, 467 req_type=req_type, ret_type=RetType.BYTES, **kwargs) 468 assert isinstance(resp, bytes) 469 if req_type is ReqType.HEX and status == 200: 470 resp = bytes.fromhex(resp.decode().strip()) 471 return resp 472 473 assert_equal(block_bin, get_block_part(query_params={"offset": 0, "size": len(block_bin)})) 474 475 assert len(block_bin) >= 500 476 assert_equal(block_bin[20:320], get_block_part(query_params={"offset": 20, "size": 300})) 477 assert_equal(block_bin[-5:], get_block_part(query_params={"offset": len(block_bin) - 5, "size": 5})) 478 479 get_block_part(status=400, query_params={"offset": 10}) 480 get_block_part(status=400, query_params={"size": 100}) 481 get_block_part(status=400, query_params={"offset": "x"}) 482 get_block_part(status=400, query_params={"size": "y"}) 483 get_block_part(status=400, query_params={"offset": "x", "size": "y"}) 484 assert get_block_part(status=400, query_params="%XY").decode("utf-8").startswith("URI parsing failed") 485 486 get_block_part(status=400, query_params={"offset": 0, "size": 0}) 487 get_block_part(status=400, query_params={"offset": len(block_bin), "size": 0}) 488 get_block_part(status=400, query_params={"offset": len(block_bin), "size": 1}) 489 get_block_part(status=400, query_params={"offset": len(block_bin) + 1, "size": 1}) 490 get_block_part(status=400, query_params={"offset": 0, "size": len(block_bin) + 1}) 491 492 res = self.test_rest_request(f"/blockpart/{blockhash}", status=400, req_type=ReqType.BIN, ret_type=RetType.OBJ) 493 assert res.read().decode().startswith("Block part offset missing or invalid") 494 495 res = self.test_rest_request(f"/blockpart/{blockhash}", query_params={"offset":0, "size":1}, status=400, req_type=ReqType.JSON, ret_type=RetType.OBJ) 496 assert res.read().decode().startswith("JSON output is not supported for this request type") 497 498 self.log.info("Missing block data should cause REST API to fail") 499 500 self.test_rest_request(f"/block/{blockhash}", status=200, req_type=ReqType.BIN, ret_type=RetType.OBJ) 501 self.test_rest_request(f"/blockpart/{blockhash}", query_params={"offset": 0, "size": 1}, status=200, req_type=ReqType.BIN, ret_type=RetType.OBJ) 502 blk_files = list(self.nodes[0].blocks_path.glob("blk*.dat")) 503 for blk_file in blk_files: 504 blk_file.rename(blk_file.with_suffix('.bkp')) 505 self.test_rest_request(f"/block/{blockhash}", status=500, req_type=ReqType.BIN, ret_type=RetType.OBJ) 506 self.test_rest_request(f"/blockpart/{blockhash}", query_params={"offset": 0, "size": 1}, status=500, req_type=ReqType.BIN, ret_type=RetType.OBJ) 507 for blk_file in blk_files: 508 blk_file.with_suffix('.bkp').rename(blk_file) 509 510 self.log.info("Test the /deploymentinfo URI") 511 512 deployment_info = self.nodes[0].getdeploymentinfo() 513 assert_equal(deployment_info, self.test_rest_request('/deploymentinfo')) 514 515 previous_bb_hash = self.nodes[0].getblockhash(self.nodes[0].getblockcount() - 1) 516 deployment_info = self.nodes[0].getdeploymentinfo(previous_bb_hash) 517 assert_equal(deployment_info, self.test_rest_request(f"/deploymentinfo/{previous_bb_hash}")) 518 519 non_existing_blockhash = '42759cde25462784395a337460bde75f58e73d3f08bd31fdc3507cbac856a2c4' 520 resp = self.test_rest_request(f'/deploymentinfo/{non_existing_blockhash}', ret_type=RetType.OBJ, status=400) 521 assert_equal(resp.read().decode('utf-8').rstrip(), "Block not found") 522 523 resp = self.test_rest_request(f"/deploymentinfo/{INVALID_PARAM}", ret_type=RetType.OBJ, status=400) 524 assert_equal(resp.read().decode('utf-8').rstrip(), f"Invalid hash: {INVALID_PARAM}") 525 526 if __name__ == '__main__': 527 RESTTest(__file__).main()