/ watcher / ethereum.js
ethereum.js
 1  const Web3 = require("web3");
 2  const path = require("path");
 3  
 4  class Ethereum {
 5    constructor(events, config) {
 6      this.events = events;
 7      this.config = config;
 8      this.contracts = {};
 9      this.logger = null;
10    }
11  
12    init(logger) {
13      this.logger = logger;
14  
15      const provider = new Web3.providers.WebsocketProvider(this.config.BLOCKCHAIN_CONNECTION_POINT);
16      this.web3 = new Web3(provider);
17  
18      const wsExitOnError = e => {
19        logger.error("web3 - WS Disconnect", e);
20        process.exit(1);
21      }
22  
23      provider.on('error', wsExitOnError);
24      provider.on('end', wsExitOnError);
25  
26      this.web3.eth.net
27        .isListening()
28        .then(() => {
29          logger.info("Connected successfully to web3");
30          this.events.emit("web3:connected");
31        })
32        .catch(error => {
33          logger.error("web3 - ", error);
34          process.exit(1);
35        });
36    }
37  
38    sleep(milliseconds) {
39      return new Promise(resolve => setTimeout(resolve, milliseconds));
40    }
41  
42    async poll(fn) {
43      await fn();
44      await this.sleep(this.config.POLL_SLEEP * 1000);
45      await this.poll(fn);
46    }
47  
48    async getEvents(dappId, contractInstance, fromBlock, toBlock) {
49      this.logger.info("Obtaining events from block range: %s - %s", fromBlock, toBlock);
50  
51      const eventNames = contractInstance.options.jsonInterface.filter(x => x.type === "event").map(x => x.name);
52  
53      await Promise.all(
54        eventNames.map(async eventName => {
55          let events = await contractInstance.getPastEvents(eventName, {
56            fromBlock: fromBlock,
57            toBlock: toBlock
58          });
59  
60          events.forEach(ev => {
61            ev.dappId = dappId;
62            this.events.emit("web3:event", ev);
63          });
64        })
65      );
66    }
67  
68    getInstance(address, ABI) {
69      if (!this.contracts[address]) {
70        this.contracts[address] = new this.web3.eth.Contract(ABI, address);
71      }
72      return this.contracts[address];
73    }
74  
75    async scan(dappId, address, ABI, startBlock = 0) {
76      let lastBlock = await this.web3.eth.getBlockNumber();
77      let lastBlockProcessed = startBlock || lastBlock - this.config.BLOCK_DELAY;
78  
79      await this.poll(async () => {
80        try {
81          const currentBlock = await this.web3.eth.getBlockNumber();
82          lastBlock = currentBlock - this.config.BLOCK_DELAY; // To avoid losing events due to reorgs
83          if (lastBlockProcessed > lastBlock) return; // Wait until more blocks are mined
84          await this.getEvents(dappId, this.getInstance(address, ABI), lastBlockProcessed, lastBlock);
85          lastBlockProcessed = lastBlock + 1;
86        } catch (e) {
87          this.logger.error(e)
88        }
89      });
90    }
91  }
92  
93  module.exports = Ethereum;