Web3漫游记——MEV机器人实践1
source link: https://www.hi-roy.com/posts/web3%E6%BC%AB%E6%B8%B8%E8%AE%B0mev%E6%9C%BA%E5%99%A8%E4%BA%BA%E5%AE%9E%E8%B7%B51/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Web3漫游记——MEV机器人实践1
今天开始进入技术同学最喜欢的代码环节,虽然前一篇文章《Web3漫游记——MEV套利技能树》主推Rust作为首选,但后续文章会以Golang为主。主要因为个人技术栈是Golang+Python,用熟悉的语言可以更专注于MEV相关逻辑实现。而使用Golang而非Python则是为了顺便熟悉一下Geth这个库,为了后面做私有节点优化提前做一些准备。有其他语言编程基础的小伙伴看懂应该都不是问题,只不过用到的库以及语法可能略有差别而已。
另外提醒一下Python技术栈的小伙伴,截止到发文日期,Python3.11
、web3.py-v6.9
和web3-flashbots-v1.1.1
的SDK目前有兼容性问题,使用python3.8可正常测试。
首先简单看一下目录结构:
├── README.md
├── .env
├── abi
│ ├── ERC20.json
│ ├── UniswapV2Factory.json
│ ├── UniswapV2Pair.json
│ ├── V2ArbBot.json
│ ├── WETH.json
│ └── erc20.go
├── go.mod
├── go.sum
└── main.go
这里为了简(tou)单(lan)就都写到main.go
里了,并且没考虑各种异常情况和优化,大家真正写工程级别代码时注意一下细节。
如何建立链接
package main
import (
"context"
"fmt"
erc20 "gevm/abi"
"log"
"math/big"
"math/rand"
"os"
"strings"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
_ "github.com/joho/godotenv/autoload"
)
func main() {
client, err := ethclient.Dial(os.Getenv("WSS_URL"))
if err != nil {
log.Fatal(err)
}
log.Println("connected!")
}
其中autoload
库会自动加载当前目录下.env
文件到环境变量中,.env
内容就是节点服务的http和ws连接地址,比如:
HTTP_URL=https://eth-mainnet.g.alchemy.com/v2/{APIKEY}
WSS_URL=wss://eth-mainnet.g.alchemy.com/v2/{APIKEY}
不要把任何APIKEY、私钥相关的硬编码到程序中,是一个非常重要的习惯。
如何监控最新的区块
func subcribeNewBlock(client *ethclient.Client) {
headers := make(chan *types.Header)
sub, err := client.SubscribeNewHead(context.Background(), headers)
if err != nil {
log.Fatal(err)
}
for {
select {
case err := <-sub.Err():
log.Fatal(err)
case header := <-headers:
fmt.Println(header.Hash())
calcNextBlockBaseFee(header.GasUsed, header.GasLimit, header.BaseFee.Uint64())
}
}
}
由于我们的client使用的是ws连接,所以当有新的区块生成,headers
这个channel就会收到数据,其定义如下:
// Header represents a block header in the Ethereum blockchain.
type Header struct {
ParentHash common.Hash `json:"parentHash" gencodec:"required"`
UncleHash common.Hash `json:"sha3Uncles" gencodec:"required"`
Coinbase common.Address `json:"miner"`
Root common.Hash `json:"stateRoot" gencodec:"required"`
TxHash common.Hash `json:"transactionsRoot" gencodec:"required"`
ReceiptHash common.Hash `json:"receiptsRoot" gencodec:"required"`
Bloom Bloom `json:"logsBloom" gencodec:"required"`
Difficulty *big.Int `json:"difficulty" gencodec:"required"`
Number *big.Int `json:"number" gencodec:"required"`
GasLimit uint64 `json:"gasLimit" gencodec:"required"`
GasUsed uint64 `json:"gasUsed" gencodec:"required"`
Time uint64 `json:"timestamp" gencodec:"required"`
Extra []byte `json:"extraData" gencodec:"required"`
MixDigest common.Hash `json:"mixHash"`
Nonce BlockNonce `json:"nonce"`
// BaseFee was added by EIP-1559 and is ignored in legacy headers.
BaseFee *big.Int `json:"baseFeePerGas" rlp:"optional"`
// WithdrawalsHash was added by EIP-4895 and is ignored in legacy headers.
WithdrawalsHash *common.Hash `json:"withdrawalsRoot" rlp:"optional"`
// BlobGasUsed was added by EIP-4844 and is ignored in legacy headers.
BlobGasUsed *uint64 `json:"blobGasUsed" rlp:"optional"`
// ExcessBlobGas was added by EIP-4844 and is ignored in legacy headers.
ExcessBlobGas *uint64 `json:"excessBlobGas" rlp:"optional"`
}
上面代码中我们简单的输出了一下区块的哈希值,然后去预估下一个区块的BaseGasFee。如果需要当前区块的所有交易信息,可以调用client.BlockByHash(context.Background(), header.Hash())
函数,这里就不展开了。
如何估算下一个区块的基础GasFee
const (
EIP1559_ELASTICITY_MULTIPLIER = 2
BASE_FEE_CHANGE_DENOMINATOR = 8
)
func calcNextBlockBaseFee(gasUsed, gasLimit, baseFee uint64) (newBaseFee uint64) {
// https://github.com/ethereum/EIPs/blob/master/EIPS/eip-1559.md
// https://github.com/foundry-rs/foundry/blob/master/crates/anvil/src/eth/fees.rs#L141
targetGasUsed := gasLimit / EIP1559_ELASTICITY_MULTIPLIER
if targetGasUsed == gasUsed {
newBaseFee = baseFee
return
}
if gasUsed > targetGasUsed {
newBaseFee = baseFee + ((baseFee*(gasUsed-targetGasUsed))/targetGasUsed)/BASE_FEE_CHANGE_DENOMINATOR
} else {
newBaseFee = baseFee - ((baseFee*(targetGasUsed-gasUsed))/targetGasUsed)/BASE_FEE_CHANGE_DENOMINATOR
}
newBaseFee += uint64(rand.Int63n(9))
return
}
还记得我们前面文章说的,面对同一个套利机会往往会有很多竞争者。给的Gas费越高,胜率越大。计算BaseFee的方法我这里参考了anvil库的算法,至于好奇那2个常量的可以去看看EIP1559协议,链接在注释里自取。
计算出来基础费用其实还是不够的,还需要加上优先费(PriorityFee,贿赂矿工的),不过这个优先费我问了大佬说是调用三方接口获取的,可能是要基于历史数据+机器学习+神经网络进行预测吧,暂时先不考虑。
如何监控内存池中Pending的交易
func subcribePendingTransaction(client *ethclient.Client) {
transactionsHash := make(chan string)
sub, _ := client.Client().EthSubscribe(context.Background(), transactionsHash, "newPendingTransactions")
for {
select {
case err := <-sub.Err():
log.Fatal(err)
case txHash := <-transactionsHash:
fmt.Println(txHash)
}
}
}
这里的逻辑和订阅最新的区块逻辑差不多,这里想多说一句的就是对于使用其他语言的小伙伴,获取Pending交易本质上就是一个特定参数的网络请求,比如:
// initiate websocket stream first
wscat -c wss://eth-mainnet.g.alchemy.com/v2/demo
// then call subscription
{"jsonrpc":"2.0","id": 2, "method": "eth_subscribe", "params": ["newPendingTransactions"]}
如果没有对应的轮子自己造一个也不是很难。
如何解析交易中Data字段
当我们拿到一个请求的哈希之后,怎么知道这个交易在做什么呢?此时就需要ABI登场了,对于大多数开源合约来说,在对应链的区块链浏览器上都可以直接看到合约对应的ABI,通常是JSON格式,比如:
此外,还可以将开源合约代码下载到本地自行编译生成。
这里以ERC20
为例来解析:
func parseERC20Transaction(client *ethclient.Client, txHash string) {
ctx, _ := os.ReadFile("./abi/ERC20.json")
erc20Abi, _ := abi.JSON(strings.NewReader(string(ctx)))
tx, isPending, err := client.TransactionByHash(context.Background(), common.HexToHash(txHash))
if err != nil {
log.Fatalln(err)
}
if len(tx.Data()) > 0 {
// 使用前4字节去找对应的函数名字
method, err := erc20Abi.MethodById(tx.Data()[:4])
if err != nil {
fmt.Println("Tx is not erc20 protocol")
} else {
fmt.Println(method.Name, isPending)
// 后面的字节就是对应函数的参数
input, err := method.Inputs.Unpack(tx.Data()[4:])
fmt.Println(input, err)
}
}
}
代码也很清晰,首先根据json内容生成abi对象,然后获取交易内容并根据data字段进行解析,获取函数名和inputData参数——也就是区块链浏览器上显示的inputData字段:
如何监控Pair创建
其实这里应该分2个部分,首先是同步历史已经创建的Pair:
func syncPairCreate(client *ethclient.Client) {
// 以太坊UniswapV2-FactoryContrace地址
address := common.HexToAddress("0x5c69bee701ef814a2b6a3edd4b1652cb9cc5aa6f")
filter := ethereum.FilterQuery{
FromBlock: big.NewInt(10000835), // 合约在这个区块部署
ToBlock: big.NewInt(10091985), // 数据过多会报错,这里限制一下
Addresses: []common.Address{address}, // 过滤只要这个合约地址的数据
}
logs, err := client.FilterLogs(context.Background(), filter)
if err != nil {
log.Fatalln(err)
}
ctx, _ := os.ReadFile("./abi/UniswapV2Factory.json")
factoryAbi, _ := abi.JSON(strings.NewReader(string(ctx)))
for _, l := range logs {
fmt.Println(l.Data, l.TxHash)
data, err := factoryAbi.Unpack("PairCreated", l.Data)
if err != nil {
log.Fatalln(err)
}
token0 := common.HexToAddress(l.Topics[1].Hex())
token1 := common.HexToAddress(l.Topics[2].Hex())
pairAddrss := data[0]
pairNum := data[1]
fmt.Println(token0, token1, pairAddrss, pairNum)
}
}
简单说,首先我们需要找到链上的合约地址以及对应的ABI,然后根据实际需求构造FilterQuery
结构,最后调用接口获取日志。在实际使用中有个需要注意的地方就是ToBlock
的值,由于这个合约已经部署很久了,如果不进行限制的话日志量过大会报错,所以真正使用时候要切分并行获取。
关于数据解析部分,这里我们获取的是事件日志,和上面解析交易的数据不同。
还是以上面的为例,UniswapV2factory
合约对应的PairCreated
事件ABI如下(注意这里是PairCreated
事件,不是createPair
函数):
{
"anonymous": false,
"inputs": [
{
"indexed": true,
"internalType": "address",
"name": "token0",
"type": "address"
},
{
"indexed": true,
"internalType": "address",
"name": "token1",
"type": "address"
},
{
"indexed": false,
"internalType": "address",
"name": "pair",
"type": "address"
},
{
"indexed": false,
"internalType": "uint256",
"name": "",
"type": "uint256"
}
],
"name": "PairCreated",
"type": "event"
}
首先Topics
中第0个元素永远是名字+参数经过keccak计算后的值,然后indexed=true
的字段会按照顺序依次存在Topics
字段里,而值为false
的数据则会存储在Data
字段,需要使用对应的ABI进行解析。
当同步完历史数据,一般会记录在数据库或者文件里,然后开是监听新Pair创建事件:
func subcribeNewPairCreated(client *ethclient.Client) {
nameHash := crypto.Keccak256Hash([]byte("PairCreated(address,address,address,uint256)"))
filters := ethereum.FilterQuery{
FromBlock: big.NewInt(18069579), // 示例,以实际运行是获取的最新区块为准,怎么获得见上面
Topics: [][]common.Hash{
{nameHash},
},
}
ctx, _ := os.ReadFile("./abi/UniswapV2Factory.json")
factoryAbi, _ := abi.JSON(strings.NewReader(string(ctx)))
logs := make(chan types.Log)
sub, _ := client.SubscribeFilterLogs(context.Background(), filters, logs)
for {
select {
case err := <-sub.Err():
log.Fatal(err)
case l := <-logs:
fmt.Println(l.TxHash)
data, err := factoryAbi.Unpack("PairCreated", l.Data)
if err != nil {
log.Fatalln(err)
}
token0 := common.HexToAddress(l.Topics[1].Hex())
token1 := common.HexToAddress(l.Topics[2].Hex())
pairAddrss := data[0]
pairNum := data[1]
fmt.Println(token0, token1, pairAddrss, pairNum)
}
}
}
这里我们之所以能够使用Topics
作为过滤条件,就是因为上面说的所有事件日志的Topics[0]
都是函数名+参数的keccak哈希值,这样就能过滤出所有新创建的Pair了。其实使用上面基于合约地址的条件进行过滤也可以,这里主要想给出另一种过滤方式,毕竟技多不压身。
说到这,喜欢冲土狗的小伙伴是不是脑海中出现了什么想法?懂的都懂,但还是注意别有个新池子就冲,记得结合其他数据分析一下,比如下面的Reserves。
如何监控Reserves
有了上面的基础,监控Reservers就是换个事件日志和ABI的事。根据UniswapV2文档,监控UniswapV2Pair
合约中Sync
事件即可:
func subcribeSyncEvent(client *ethclient.Client) {
nameHash := crypto.Keccak256Hash([]byte("Sync(uint112,uint112)"))
filters := ethereum.FilterQuery{
FromBlock: big.NewInt(18069579), // 示例,以实际运行是获取的最新区块为准,怎么获得见上面
Topics: [][]common.Hash{
{nameHash},
},
}
ctx, _ := os.ReadFile("./abi/UniswapV2Pair.json")
pairAbi, _ := abi.JSON(strings.NewReader(string(ctx)))
logs := make(chan types.Log)
sub, _ := client.SubscribeFilterLogs(context.Background(), filters, logs)
for {
select {
case err := <-sub.Err():
log.Fatal(err)
case l := <-logs:
fmt.Println(l.TxHash)
data, err := pairAbi.Unpack("Sync", l.Data)
if err != nil {
log.Fatalln(err)
}
pairAddr := l.Address.Hex()
reserves0 := data[0]
reserves1 := data[1]
fmt.Println(pairAddr, reserves0, reserves1)
}
}
}
不多解释了,这里如果想针对于某个Pair来监控,添加Addresses
过滤条件即可。不过这里建议有兴趣的去看看ethereum.FilterQuery
支持的条件,后面可以高效的进行组合过滤。
如何调用合约
最后说说如何与合约进行交互,Python直接有ABI的JSON文件进行反序列化就可以了,而在Golang里则需要使用abigen
来生成对应的go文件。首先要找到go-ethereum
源码包的位置或者从github下载,然后进入cmd/abigen
目录去自己go build
,然后到abi
目录执行:
abigen --abi=ERC20.json --pkg=abi --out=erc20.go --alias _totalSupply=totalSupply1
注意这里的--alias
参数,由于这个程序不能自动处理下划线,不加这个参数会报错。另外不同的JSON文件生成go代码时,pkg
参数也不能一样,否则会报函数已存在一类的错误,这点真是比Python差远了。
生成go文件后就当普通的包使用,如果报错无法import一类的,执行go mod tidy
即可。
// import erc20 "gevm/abi"
func getTokenDecimals(client *ethclient.Client, tokenAddr string) {
tokenAddr := common.HexToAddress(tokenAddr)
e, _ := erc20.NewErc20(tokenAddr, client)
i, _ := e.Decimals(nil)
fmt.Println(i)
}
上面的例子就是传入一个token的合约地址,然后调用Decimals()
获取精度,其他的用法都差不多不多说了。
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK