Web3js

使用 python (web3py) 訂閱新的塊頭

  • July 22, 2021

我正在尋找 web3py 中的訂閱功能,就像在 web3js 中實現的那樣

web3.eth.subscribe('newBlockHeaders' [, callback]);

由於目前在 web3py 中沒有實現它,有沒有人知道可能的實現或一些資源?

web3.py 沒有像 web3.js 這樣的觀察者的本機實現。您唯一的選擇是連接到暴露的 websocket,如下所示:

import asyncio
import json
import requests
from websockets import connect

async def get_event():
   async with connect("ws://localhost:8545") as ws:
       await ws.send({"id": 1, "method": "eth_subscribe", "params": ["newHeads"]})
       subscription_response = await ws.recv()
       print(subscription_response)
       # you are now subscribed to the event 
       # you keep trying to listen to new events (similar idea to longPolling)
       while True:
           try:
               message = await asyncio.wait_for(ws.recv(), timeout=60)
               print(json.loads(message))
               pass
           except:
               pass
if __name__ == "__main__":
   loop = asyncio.get_event_loop()
   while True:
       loop.run_until_complete(get_event())

請注意,subscription_response將有一個訂閱 ID,您可以使用它來取消訂閱。您可以從文件中找到所有 jsonRPC 方法

引用自:https://ethereum.stackexchange.com/questions/103925