5

redis之key过期源码分析

 2 years ago
source link: https://wakzz.cn/2019/07/08/redis/redis%E4%B9%8Bkey%E8%BF%87%E6%9C%9F%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

redis之key过期源码分析

祈雨的博客
2019-07-08

redis的所有数据结构都可以设置过期时间,当key过期后再查询该key返回null。

redis实现key自动过期是通过额外保存需要自动过期的key和该key的过期时间,然后通过主动删除和定时任务删除两种机制来将过期的key移除并回收内存。在redis4.0版本引入了异步删除的机制,对于删除对象大小大于64字节的key,先通过Unlink方法软删除后放入回收队列中,由其他线程异步回收内存空间,减少主线程的在内存回收上消耗的时间。

typedef struct redisDb {
dict *dict; // 保存key-value数据
dict *expires; // 保存等待过期时间的key和对应的过期时间
dict *blocking_keys;
dict *ready_keys;
dict *watched_keys;
int id;
long long avg_ttl;
list *defrag_later;
} redisDb;

image

redis添加自动过期的key时,先向hash结构数据redisDb.dict添加该key和key对应的值。添加成功后,调用setExpire方法向hash结构数据redisDb.expires添加该key和key对应的过期时间,过期时间的单位为毫秒。

void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
long long milliseconds = 0; /* initialized to avoid any harmness warning */

// 计算key的过期时间
if (expire) {
if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
return;
if (milliseconds <= 0) {
addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name);
return;
}
if (unit == UNIT_SECONDS) milliseconds *= 1000;
}

if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
(flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
{
addReply(c, abort_reply ? abort_reply : shared.null[c->resp]);
return;
}
// 添加key
setKey(c->db,key,val);
server.dirty++;
// 给key设置过期时间
if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
"expire",key,c->db->id);
addReply(c, ok_reply ? ok_reply : shared.ok);
}

void setExpire(client *c, redisDb *db, robj *key, long long when) {
dictEntry *kde, *de;

// 从db.dict查询key,校验该key已经保存到redis
kde = dictFind(db->dict,key->ptr);
serverAssertWithInfo(NULL,key,kde != NULL);
// 向hash表db.expires添加key和key的过期时间,用于过期删除key
de = dictAddOrFind(db->expires,dictGetKey(kde));
dictSetSignedIntegerVal(de,when);

int writable_slave = server.masterhost && server.repl_slave_ro == 0;
if (c && writable_slave && !(c->flags & CLIENT_MASTER))
rememberSlaveKeyWithExpire(db,key);
}

过期key的删除回收分为主动删除和定时删除两种策略:

  • 主动删除:对key删改查操作时先判断该key是否已过期,如过期则删除回收该key
  • 定时删除:主线程每100毫秒从redisDb.expires中随机选择20个key,删除其中已过期的key。如果过期的key的比例超过1/4则重复该操作直到达到时间上限(默认25毫秒)

对于key的删改查操作,先查询hash表redisDb.expires判断该key是否已经过期,如果key已过期,则默认调用dbAsyncDelete方法异步删除回收该key的内存空间;若key未过期,继续进行删改查操作。

另外在redis主从架构下,从节点不处理过期机制,通过等待主节点的指令直接删除对应的key。因此当网络延迟较大时,存在主节点中已过期的key能在从节点查询出来的问题。

robj *lookupKeyRead(redisDb *db, robj *key) {
// 查询操作
return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
}

robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
robj *val;

// 查询key的值前先检查该key是否已过期
// 如果key过期则删除回收该key
if (expireIfNeeded(db,key) == 1) {
if (server.masterhost == NULL) {
server.stat_keyspace_misses++;
notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
return NULL;
}

if (server.current_client &&
server.current_client != server.master &&
server.current_client->cmd &&
server.current_client->cmd->flags & CMD_READONLY)
{
server.stat_keyspace_misses++;
notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
return NULL;
}
}
// key未过期,查询key对应的值
val = lookupKey(db,key,flags);
if (val == NULL) {
server.stat_keyspace_misses++;
notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
}
else
server.stat_keyspace_hits++;
return val;
}

int expireIfNeeded(redisDb *db, robj *key) {
// 查询redisDb.expires判断该key是否过期
if (!keyIsExpired(db,key)) return 0;

// 从节点不处理过期机制,等待主节点的指令直接删除对应的key
// 因此存在网络延迟大时,已过期的key能在从节点查询出来的问题
if (server.masterhost != NULL) return 1;

server.stat_expiredkeys++;
propagateExpire(db,key,server.lazyfree_lazy_expire);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",key,db->id);
// 默认使用dbAsyncDelete异步删除回收
return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
dbSyncDelete(db,key);
}

databasesCron方法由主线程每100毫秒调用一次,其中的activeExpireCycle方法则是删除回收过期key的关键方法,其入参有两个值:ACTIVE_EXPIRE_CYCLE_FASTACTIVE_EXPIRE_CYCLE_SLOW

  • ACTIVE_EXPIRE_CYCLE_FAST:表示快速删除回收过期key,该场景下每次删除回收的时间上限为1毫秒,当主线程处理完外部请求后等待新请求前阻塞时会使用该参数;
  • ACTIVE_EXPIRE_CYCLE_SLOW:表示慢删除回收过期key,该场景下每次删除回收的时间上限为25毫秒,当主线程每100毫秒执行定时任务时使用该参数;

由于主线程每100毫秒会调用一次activeExpireCycle方法回收过期key,因此存在极端情况下同一时刻redis中大量数据同时过期,会导致每100毫秒一次的定时任务activeExpireCycle需要花费25毫秒的时间删除回收过期key,从而出现客户端请求等待阻塞的情况。

void databasesCron(void) {
if (server.active_expire_enabled) {
if (server.masterhost == NULL) {
// 定时任务使用慢扫描回收,扫描回收的时间上限为25毫秒
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
} else {
expireSlaveKeys();
}
}
...
}

#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20
void activeExpireCycle(int type) {
...

// timelimit = 1000000*25/10/100 = 25_000微妙 = 25毫秒
timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100;
timelimit_exit = 0;
if (timelimit <= 0) timelimit = 1;

// 快速扫描回收,扫描回收时间上限为1毫秒
// 慢扫描回收,扫描回收时间上限默认timelimit = 25毫秒
if (type == ACTIVE_EXPIRE_CYCLE_FAST)
timelimit = ACTIVE_EXPIRE_CYCLE_FASTf_DURATION; /* in microseconds. */

long total_sampled = 0;
long total_expired = 0;

for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
int expired;
redisDb *db = server.db+(current_db % server.dbnum);

current_db++;

do {
unsigned long num, slots;
long long now, ttl_sum;
int ttl_samples;
iteration++;

if ((num = dictSize(db->expires)) == 0) {
db->avg_ttl = 0;
break;
}
slots = dictSlots(db->expires);
now = mstime();

if (num && slots > DICT_HT_INITIAL_SIZE &&
(num*100/slots < 1)) break;

expired = 0;
ttl_sum = 0;
ttl_samples = 0;

// 每次从redisDb.expires随机获取20个key
if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;

while (num--) {
dictEntry *de;
long long ttl;

// 从redisDb.expires随机获取一个key
if ((de = dictGetRandomKey(db->expires)) == NULL) break;
ttl = dictGetSignedIntegerVal(de)-now;
// 如果key已过期,默认调用dbAsyncDelete异步删除回收,并累计过期的key数量
if (activeExpireCycleTryExpire(db,de,now)) expired++;
if (ttl > 0) {
ttl_sum += ttl;
ttl_samples++;
}
total_sampled++;
}
total_expired += expired;

if (ttl_samples) {
long long avg_ttl = ttl_sum/ttl_samples;

if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
}

// 如果过期key的扫描回收时间达到了上限,则结束此次扫描回收操作
if ((iteration & 0xf) == 0) {
elapsed = ustime()-start;
if (elapsed > timelimit) {
timelimit_exit = 1;
server.stat_expired_time_cap_reached_count++;
break;
}
}
// 每次随机获取的20个key如果超过1/4已经过期,则重复操作删除回收过期的key
} while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4);
}
...
}

在redis4.0版本默认调用dbAsyncDelete方法删除回收key,但实际上dbAsyncDelete会判断该key的对象大小,如果key的对象大小超过64字节时才会真正使用异步删除逻辑,将该key放入BIO队列由其他线程删除回收内存空间;否则仍然使用同步删除逻辑直接回收内存空间。

#define LAZYFREE_THRESHOLD 64
int dbAsyncDelete(redisDb *db, robj *key) {
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);

dictEntry *de = dictUnlink(db->dict,key->ptr);
if (de) {
robj *val = dictGetVal(de);
size_t free_effort = lazyfreeGetFreeEffort(val);

// 如果待删除key的长度超过64字节,异步删除
if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
atomicIncr(lazyfree_objects,1);
// 放入BIO队列中由其他进程回收内存空间
bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
dictSetVal(db->dict,de,NULL);
}
}

// 待删除key长度不超过64字节,依然使用同步删除
if (de) {
dictFreeUnlinkedEntry(db->dict,de); // 同步回收内存空间
if (server.cluster_enabled) slotToKeyDel(key);
return 1;
} else {
return 0;
}
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK