Uniswap
Uniswap V3,計算達到目標價格的交易量
給定一個具有一個或多個流動性頭寸的池,是否有一種有效的算法來計算達到目標價格所需的掉期量?
例如,考慮一個具有算法再平衡和大量營運資金的穩定代幣。隨著價格偏離 1:1 美元,假設交易機器人有足夠的資金,多頭或空頭掉期可能會將價格推回錨定匯率。
機器人將如何計算訂單的數量?
讓我們假設一個包含 X 和 Y 資產的池(順序很重要)。如果 X 的目前價格低於目標價格,則必須從池中購買一些 X 資產。否則,如果 X 的價格高於目標價格,則必須從池中購買一些 Y 資產。
基本思想是使用類似於圖 4 Uniswap v3 白皮書中的交換算法的算法來計算要購買的 X 或 Y 的數量。
基本構想:
delta
將變數初始化為零。遍歷所有分時範圍,直到達到目標價格:
如果目標價格在考慮的刻度範圍內,計算達到該價格所需的代幣數量,將其添加到 中
delta
,然後中斷循環。否則,將刻度範圍內的所有代幣添加到
delta
(通過查看其流動性找到),然後切換到下一個刻度範圍。買入 X 和買入 Y 的主要邏輯是對稱的,但根據價格必須移動的方向,實施細節略有不同。
假設這些變數已經初始化:
contract
- 礦池的契約sCurrentPrice
- 目前價格的 sqrtsPriceTarget
- 目標價的平方liquidity
- 池目前分時範圍內的流動性tickLower
,tickUpper
- 目前刻度範圍的最小和最大刻度sPriceUpper
,sPriceUpper
- 對應於目前範圍的最小和最大刻度的價格的平方根tickSpacing
- 池中的刻度間距。decimalsX
,decimalsY
- X 和 Y 標記的小數位數,用於列印結果以下是概念驗證 Python 程式碼。
我們先定義一些輔助函式:
# amount of x in range; sp - sqrt of current price, sb - sqrt of max price def x_in_range(L, sp, sb): return L * (sb - sp) / (sp * sb) # amount of y in range; sp - sqrt of current price, sa - sqrt of min price def y_in_range(L, sp, sa): return L * (sp - sa) def tick_to_price(tick): return 1.0001 ** tick
主要程式碼:
from web3 import Web3 from collections import namedtuple Tick = namedtuple("Tick", "liquidityGross liquidityNet feeGrowthOutside0X128 feeGrowthOutside1X128 tickCumulativeOutside secondsPerLiquidityOutsideX128 secondsOutside initialized") # how much of X or Y tokens we need to *buy* to get to the target price? deltaTokens = 0 if sPriceTarget > sPriceCurrent: # too few Y in the pool; we need to buy some X to increase amount of Y in pool while sPriceTarget > sPriceCurrent: if sPriceTarget > sPriceUpper: # not in the current price range; use all X in the range x = x_in_range(liquidity, sPriceCurrent, sPriceUpper) deltaTokens += x # query the blockchain for liquidity in the next tick range nextTickRange = Tick(*contract.functions.ticks(tickUpper).call()) liquidity += nextTickRange.liquidityNet # adjust the price and the range limits sPriceCurrent = sPriceUpper tickLower = tickUpper tickUpper += tickSpacing sPriceLower = sPriceUpper sPriceUpper = tick_to_price(tickUpper // 2) else: # in the current price range x = x_in_range(liquidity, sPriceCurrent, sPriceTarget) deltaTokens += x sPriceCurrent = sPriceTarget print("need to buy {:.10f} X tokens".format(deltaTokens / 10 ** decimalsX)) elif sPriceTarget < sPriceCurrent: # too much Y in the pool; we need to buy some Y to decrease amount of Y in pool currentTickRange = None while sPriceTarget < sPriceCurrent: if sPriceTarget < sPriceLower: # not in the current price range; use all Y in the range y = y_in_range(liquidity, sPriceCurrent, sPriceLower) deltaTokens += y if currentTickRange is None: # query the blockchain for liquidityNet in the *current* tick range currentTickRange = Tick(*contract.functions.ticks(tickLower).call()) liquidity -= currentTickRange.liquidityNet # adjust the price and the range limits sPriceCurrent = sPriceLower tickUpper = tickLower tickLower -= tickSpacing sPriceUpper = sPriceLower sPriceLower = tick_to_price(tickLower // 2) # query the blockchain for liquidityNet in new current tick range currentTickRange = Tick(*contract.functions.ticks(tickLower).call()) else: # in the current price range y = y_in_range(liquidity, sPriceCurrent, sPriceTarget) deltaTokens += y sPriceCurrent = sPriceTarget print("need to buy {:.10f} Y tokens".format(deltaTokens / 10 ** decimalsY))
主要結果儲存在變數
deltaTokens
中,因為可以在exactOutputSingle
函式呼叫中使用帶有 then 的值。一個警告:該方法循環遍歷所有刻度範圍,直到達到目標價格。在大多數情況下這很好,但這種方法可能效率很低。如果價格差異很大並且大多數可能已初始化的分時未初始化,則查看下一個已初始化分時會更有效。另一個問題是,如果池中沒有足夠的流動性,循環將永遠不會停止,因為目前沒有其他停止條件。