13

Web3漫游记——MEV机器人实践1

 10 months ago
source link: https://www.hi-roy.com/posts/web3%E6%BC%AB%E6%B8%B8%E8%AE%B0mev%E6%9C%BA%E5%99%A8%E4%BA%BA%E5%AE%9E%E8%B7%B51/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Web3漫游记——MEV机器人实践1

2023-09-11

今天开始进入技术同学最喜欢的代码环节,虽然前一篇文章《Web3漫游记——MEV套利技能树》主推Rust作为首选,但后续文章会以Golang为主。主要因为个人技术栈是Golang+Python,用熟悉的语言可以更专注于MEV相关逻辑实现。而使用Golang而非Python则是为了顺便熟悉一下Geth这个库,为了后面做私有节点优化提前做一些准备。有其他语言编程基础的小伙伴看懂应该都不是问题,只不过用到的库以及语法可能略有差别而已。

另外提醒一下Python技术栈的小伙伴,截止到发文日期,Python3.11web3.py-v6.9web3-flashbots-v1.1.1的SDK目前有兼容性问题,使用python3.8可正常测试。

首先简单看一下目录结构:


├── README.md
├── .env
├── abi
│   ├── ERC20.json
│   ├── UniswapV2Factory.json
│   ├── UniswapV2Pair.json
│   ├── V2ArbBot.json
│   ├── WETH.json
│   └── erc20.go
├── go.mod
├── go.sum
└── main.go

这里为了简(tou)单(lan)就都写到main.go里了,并且没考虑各种异常情况和优化,大家真正写工程级别代码时注意一下细节。

如何建立链接

package main

import (
	"context"
	"fmt"
	erc20 "gevm/abi"
	"log"
	"math/big"
	"math/rand"
	"os"
	"strings"

	"github.com/ethereum/go-ethereum"
	"github.com/ethereum/go-ethereum/accounts/abi"
	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/core/types"
	"github.com/ethereum/go-ethereum/crypto"
	"github.com/ethereum/go-ethereum/ethclient"
	_ "github.com/joho/godotenv/autoload"
)

func main() {
	client, err := ethclient.Dial(os.Getenv("WSS_URL"))
	if err != nil {
		log.Fatal(err)
	}
	log.Println("connected!")
}

其中autoload库会自动加载当前目录下.env文件到环境变量中,.env内容就是节点服务的http和ws连接地址,比如:

HTTP_URL=https://eth-mainnet.g.alchemy.com/v2/{APIKEY}
WSS_URL=wss://eth-mainnet.g.alchemy.com/v2/{APIKEY}

不要把任何APIKEY、私钥相关的硬编码到程序中,是一个非常重要的习惯。

如何监控最新的区块

func subcribeNewBlock(client *ethclient.Client) {
	headers := make(chan *types.Header)
	sub, err := client.SubscribeNewHead(context.Background(), headers)
	if err != nil {
		log.Fatal(err)
	}
	for {
		select {
		case err := <-sub.Err():
			log.Fatal(err)
		case header := <-headers:
			fmt.Println(header.Hash())
			calcNextBlockBaseFee(header.GasUsed, header.GasLimit, header.BaseFee.Uint64())
		}
	}
}

由于我们的client使用的是ws连接,所以当有新的区块生成,headers这个channel就会收到数据,其定义如下:

// Header represents a block header in the Ethereum blockchain.
type Header struct {
	ParentHash  common.Hash    `json:"parentHash"       gencodec:"required"`
	UncleHash   common.Hash    `json:"sha3Uncles"       gencodec:"required"`
	Coinbase    common.Address `json:"miner"`
	Root        common.Hash    `json:"stateRoot"        gencodec:"required"`
	TxHash      common.Hash    `json:"transactionsRoot" gencodec:"required"`
	ReceiptHash common.Hash    `json:"receiptsRoot"     gencodec:"required"`
	Bloom       Bloom          `json:"logsBloom"        gencodec:"required"`
	Difficulty  *big.Int       `json:"difficulty"       gencodec:"required"`
	Number      *big.Int       `json:"number"           gencodec:"required"`
	GasLimit    uint64         `json:"gasLimit"         gencodec:"required"`
	GasUsed     uint64         `json:"gasUsed"          gencodec:"required"`
	Time        uint64         `json:"timestamp"        gencodec:"required"`
	Extra       []byte         `json:"extraData"        gencodec:"required"`
	MixDigest   common.Hash    `json:"mixHash"`
	Nonce       BlockNonce     `json:"nonce"`

	// BaseFee was added by EIP-1559 and is ignored in legacy headers.
	BaseFee *big.Int `json:"baseFeePerGas" rlp:"optional"`

	// WithdrawalsHash was added by EIP-4895 and is ignored in legacy headers.
	WithdrawalsHash *common.Hash `json:"withdrawalsRoot" rlp:"optional"`

	// BlobGasUsed was added by EIP-4844 and is ignored in legacy headers.
	BlobGasUsed *uint64 `json:"blobGasUsed" rlp:"optional"`

	// ExcessBlobGas was added by EIP-4844 and is ignored in legacy headers.
	ExcessBlobGas *uint64 `json:"excessBlobGas" rlp:"optional"`
}

上面代码中我们简单的输出了一下区块的哈希值,然后去预估下一个区块的BaseGasFee。如果需要当前区块的所有交易信息,可以调用client.BlockByHash(context.Background(), header.Hash())函数,这里就不展开了。

如何估算下一个区块的基础GasFee

const (
	EIP1559_ELASTICITY_MULTIPLIER = 2
	BASE_FEE_CHANGE_DENOMINATOR   = 8
)

func calcNextBlockBaseFee(gasUsed, gasLimit, baseFee uint64) (newBaseFee uint64) {
	// https://github.com/ethereum/EIPs/blob/master/EIPS/eip-1559.md
	// https://github.com/foundry-rs/foundry/blob/master/crates/anvil/src/eth/fees.rs#L141
	targetGasUsed := gasLimit / EIP1559_ELASTICITY_MULTIPLIER
	if targetGasUsed == gasUsed {
		newBaseFee = baseFee
		return
	}
	if gasUsed > targetGasUsed {
		newBaseFee = baseFee + ((baseFee*(gasUsed-targetGasUsed))/targetGasUsed)/BASE_FEE_CHANGE_DENOMINATOR
	} else {
		newBaseFee = baseFee - ((baseFee*(targetGasUsed-gasUsed))/targetGasUsed)/BASE_FEE_CHANGE_DENOMINATOR
	}
	newBaseFee += uint64(rand.Int63n(9))
	return
}

还记得我们前面文章说的,面对同一个套利机会往往会有很多竞争者。给的Gas费越高,胜率越大。计算BaseFee的方法我这里参考了anvil库的算法,至于好奇那2个常量的可以去看看EIP1559协议,链接在注释里自取。

计算出来基础费用其实还是不够的,还需要加上优先费(PriorityFee,贿赂矿工的),不过这个优先费我问了大佬说是调用三方接口获取的,可能是要基于历史数据+机器学习+神经网络进行预测吧,暂时先不考虑。

如何监控内存池中Pending的交易

func subcribePendingTransaction(client *ethclient.Client) {
	transactionsHash := make(chan string)
	sub, _ := client.Client().EthSubscribe(context.Background(), transactionsHash, "newPendingTransactions")
	for {
		select {
		case err := <-sub.Err():
			log.Fatal(err)
		case txHash := <-transactionsHash:
			fmt.Println(txHash)
		}
	}
}

这里的逻辑和订阅最新的区块逻辑差不多,这里想多说一句的就是对于使用其他语言的小伙伴,获取Pending交易本质上就是一个特定参数的网络请求,比如:

// initiate websocket stream first
wscat -c wss://eth-mainnet.g.alchemy.com/v2/demo

// then call subscription
{"jsonrpc":"2.0","id": 2, "method": "eth_subscribe", "params": ["newPendingTransactions"]}

如果没有对应的轮子自己造一个也不是很难。

如何解析交易中Data字段

当我们拿到一个请求的哈希之后,怎么知道这个交易在做什么呢?此时就需要ABI登场了,对于大多数开源合约来说,在对应链的区块链浏览器上都可以直接看到合约对应的ABI,通常是JSON格式,比如:

abi

此外,还可以将开源合约代码下载到本地自行编译生成。

这里以ERC20为例来解析:

func parseERC20Transaction(client *ethclient.Client, txHash string) {
	ctx, _ := os.ReadFile("./abi/ERC20.json")
	erc20Abi, _ := abi.JSON(strings.NewReader(string(ctx)))
	tx, isPending, err := client.TransactionByHash(context.Background(), common.HexToHash(txHash))
	if err != nil {
		log.Fatalln(err)
	}
	if len(tx.Data()) > 0 {
		// 使用前4字节去找对应的函数名字
		method, err := erc20Abi.MethodById(tx.Data()[:4])
		if err != nil {
			fmt.Println("Tx is not erc20 protocol")
		} else {
			fmt.Println(method.Name, isPending)
			// 后面的字节就是对应函数的参数
			input, err := method.Inputs.Unpack(tx.Data()[4:])
			fmt.Println(input, err)
		}
	}
}

代码也很清晰,首先根据json内容生成abi对象,然后获取交易内容并根据data字段进行解析,获取函数名和inputData参数——也就是区块链浏览器上显示的inputData字段:

inputdata

如何监控Pair创建

其实这里应该分2个部分,首先是同步历史已经创建的Pair:

func syncPairCreate(client *ethclient.Client) {
	// 以太坊UniswapV2-FactoryContrace地址
	address := common.HexToAddress("0x5c69bee701ef814a2b6a3edd4b1652cb9cc5aa6f")
	filter := ethereum.FilterQuery{
		FromBlock: big.NewInt(10000835),      // 合约在这个区块部署
		ToBlock:   big.NewInt(10091985),      // 数据过多会报错,这里限制一下
		Addresses: []common.Address{address}, // 过滤只要这个合约地址的数据
	}
	logs, err := client.FilterLogs(context.Background(), filter)
	if err != nil {
		log.Fatalln(err)
	}
	ctx, _ := os.ReadFile("./abi/UniswapV2Factory.json")
	factoryAbi, _ := abi.JSON(strings.NewReader(string(ctx)))
	for _, l := range logs {
		fmt.Println(l.Data, l.TxHash)
		data, err := factoryAbi.Unpack("PairCreated", l.Data)
		if err != nil {
			log.Fatalln(err)
		}
		token0 := common.HexToAddress(l.Topics[1].Hex())
		token1 := common.HexToAddress(l.Topics[2].Hex())
		pairAddrss := data[0]
		pairNum := data[1]
		fmt.Println(token0, token1, pairAddrss, pairNum)
	}
}

简单说,首先我们需要找到链上的合约地址以及对应的ABI,然后根据实际需求构造FilterQuery结构,最后调用接口获取日志。在实际使用中有个需要注意的地方就是ToBlock的值,由于这个合约已经部署很久了,如果不进行限制的话日志量过大会报错,所以真正使用时候要切分并行获取。

关于数据解析部分,这里我们获取的是事件日志,和上面解析交易的数据不同。

还是以上面的为例,UniswapV2factory合约对应的PairCreated事件ABI如下(注意这里是PairCreated事件,不是createPair函数):

{
	"anonymous": false,
	"inputs": [
		{
			"indexed": true,
			"internalType": "address",
			"name": "token0",
			"type": "address"
		},
		{
			"indexed": true,
			"internalType": "address",
			"name": "token1",
			"type": "address"
		},
		{
			"indexed": false,
			"internalType": "address",
			"name": "pair",
			"type": "address"
		},
		{
			"indexed": false,
			"internalType": "uint256",
			"name": "",
			"type": "uint256"
		}
	],
	"name": "PairCreated",
	"type": "event"
}

首先Topics中第0个元素永远是名字+参数经过keccak计算后的值,然后indexed=true的字段会按照顺序依次存在Topics字段里,而值为false的数据则会存储在Data字段,需要使用对应的ABI进行解析。

当同步完历史数据,一般会记录在数据库或者文件里,然后开是监听新Pair创建事件:

func subcribeNewPairCreated(client *ethclient.Client) {
	nameHash := crypto.Keccak256Hash([]byte("PairCreated(address,address,address,uint256)"))
	filters := ethereum.FilterQuery{
		FromBlock: big.NewInt(18069579), // 示例,以实际运行是获取的最新区块为准,怎么获得见上面
		Topics: [][]common.Hash{
			{nameHash},
		},
	}
	ctx, _ := os.ReadFile("./abi/UniswapV2Factory.json")
	factoryAbi, _ := abi.JSON(strings.NewReader(string(ctx)))
	logs := make(chan types.Log)
	sub, _ := client.SubscribeFilterLogs(context.Background(), filters, logs)
	for {
		select {
		case err := <-sub.Err():
			log.Fatal(err)
		case l := <-logs:
			fmt.Println(l.TxHash)
			data, err := factoryAbi.Unpack("PairCreated", l.Data)
			if err != nil {
				log.Fatalln(err)
			}
			token0 := common.HexToAddress(l.Topics[1].Hex())
			token1 := common.HexToAddress(l.Topics[2].Hex())
			pairAddrss := data[0]
			pairNum := data[1]
			fmt.Println(token0, token1, pairAddrss, pairNum)
		}
	}
}

这里我们之所以能够使用Topics作为过滤条件,就是因为上面说的所有事件日志的Topics[0]都是函数名+参数的keccak哈希值,这样就能过滤出所有新创建的Pair了。其实使用上面基于合约地址的条件进行过滤也可以,这里主要想给出另一种过滤方式,毕竟技多不压身。

说到这,喜欢冲土狗的小伙伴是不是脑海中出现了什么想法?懂的都懂,但还是注意别有个新池子就冲,记得结合其他数据分析一下,比如下面的Reserves。

如何监控Reserves

有了上面的基础,监控Reservers就是换个事件日志和ABI的事。根据UniswapV2文档,监控UniswapV2Pair合约中Sync事件即可:

func subcribeSyncEvent(client *ethclient.Client) {
	nameHash := crypto.Keccak256Hash([]byte("Sync(uint112,uint112)"))
	filters := ethereum.FilterQuery{
		FromBlock: big.NewInt(18069579), // 示例,以实际运行是获取的最新区块为准,怎么获得见上面
		Topics: [][]common.Hash{
			{nameHash},
		},
	}
	ctx, _ := os.ReadFile("./abi/UniswapV2Pair.json")
	pairAbi, _ := abi.JSON(strings.NewReader(string(ctx)))
	logs := make(chan types.Log)
	sub, _ := client.SubscribeFilterLogs(context.Background(), filters, logs)
	for {
		select {
		case err := <-sub.Err():
			log.Fatal(err)
		case l := <-logs:
			fmt.Println(l.TxHash)
			data, err := pairAbi.Unpack("Sync", l.Data)
			if err != nil {
				log.Fatalln(err)
			}
			pairAddr := l.Address.Hex()
			reserves0 := data[0]
			reserves1 := data[1]
			fmt.Println(pairAddr, reserves0, reserves1)
		}
	}
}

不多解释了,这里如果想针对于某个Pair来监控,添加Addresses过滤条件即可。不过这里建议有兴趣的去看看ethereum.FilterQuery支持的条件,后面可以高效的进行组合过滤。

如何调用合约

最后说说如何与合约进行交互,Python直接有ABI的JSON文件进行反序列化就可以了,而在Golang里则需要使用abigen来生成对应的go文件。首先要找到go-ethereum源码包的位置或者从github下载,然后进入cmd/abigen目录去自己go build,然后到abi目录执行:

abigen --abi=ERC20.json  --pkg=abi --out=erc20.go --alias _totalSupply=totalSupply1

注意这里的--alias参数,由于这个程序不能自动处理下划线,不加这个参数会报错。另外不同的JSON文件生成go代码时,pkg参数也不能一样,否则会报函数已存在一类的错误,这点真是比Python差远了。

生成go文件后就当普通的包使用,如果报错无法import一类的,执行go mod tidy即可。

// import erc20 "gevm/abi"
func getTokenDecimals(client *ethclient.Client, tokenAddr string) {
	tokenAddr := common.HexToAddress(tokenAddr)
	e, _ := erc20.NewErc20(tokenAddr, client)
	i, _ := e.Decimals(nil)
	fmt.Println(i)
}

上面的例子就是传入一个token的合约地址,然后调用Decimals()获取精度,其他的用法都差不多不多说了。


wechat


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK