Web3js
使用 python (web3py) 訂閱新的塊頭
我正在尋找 web3py 中的訂閱功能,就像在 web3js 中實現的那樣:
web3.eth.subscribe('newBlockHeaders' [, callback]);
由於目前在 web3py 中沒有實現它,有沒有人知道可能的實現或一些資源?
web3.py 沒有像 web3.js 這樣的觀察者的本機實現。您唯一的選擇是連接到暴露的 websocket,如下所示:
import asyncio import json import requests from websockets import connect async def get_event(): async with connect("ws://localhost:8545") as ws: await ws.send({"id": 1, "method": "eth_subscribe", "params": ["newHeads"]}) subscription_response = await ws.recv() print(subscription_response) # you are now subscribed to the event # you keep trying to listen to new events (similar idea to longPolling) while True: try: message = await asyncio.wait_for(ws.recv(), timeout=60) print(json.loads(message)) pass except: pass if __name__ == "__main__": loop = asyncio.get_event_loop() while True: loop.run_until_complete(get_event())
請注意,
subscription_response
將有一個訂閱 ID,您可以使用它來取消訂閱。您可以從文件中找到所有 jsonRPC 方法