最佳实践|如何使用c++开发redis module
阿里妹导读
简介
原理
1.Redis内核会暴露出/导出很多API给module使用(如内存分配接口、redis核心db结构的操作接口),注意这些API是redis自己解析绑定的,而不是靠动态连接器解析的。
加载
int moduleLoad(const char *path, void **module_argv, int module_argc, int is_loadex) {
int (*onload)(void *, void **, int);
void *handle;
struct stat st;
if (stat(path, &st) == 0) {
/* This check is best effort */
if (!(st.st_mode & (S_IXUSR | S_IXGRP | S_IXOTH))) {
serverLog(LL_WARNING, "Module %s failed to load: It does not have execute permissions.", path);
return C_ERR;
}
}
// 打开module so
handle = dlopen(path,RTLD_NOW|RTLD_LOCAL);
if (handle == NULL) {
serverLog(LL_WARNING, "Module %s failed to load: %s", path, dlerror());
return C_ERR;
}
// 获取module中的onload函数符号地址
onload = (int (*)(void *, void **, int))(unsigned long) dlsym(handle,"RedisModule_OnLoad");
if (onload == NULL) {
dlclose(handle);
serverLog(LL_WARNING,
"Module %s does not export RedisModule_OnLoad() "
"symbol. Module not loaded.",path);
return C_ERR;
}
RedisModuleCtx ctx;
moduleCreateContext(&ctx, NULL, REDISMODULE_CTX_TEMP_CLIENT); /* We pass NULL since we don't have a module yet. */
// 调用onload对module进行初始化
if (onload((void*)&ctx,module_argv,module_argc) == REDISMODULE_ERR) {
serverLog(LL_WARNING,
"Module %s initialization failed. Module not loaded",path);
if (ctx.module) {
moduleUnregisterCommands(ctx.module);
moduleUnregisterSharedAPI(ctx.module);
moduleUnregisterUsedAPI(ctx.module);
moduleRemoveConfigs(ctx.module);
moduleFreeModuleStructure(ctx.module);
}
moduleFreeContext(&ctx);
dlclose(handle);
return C_ERR;
}
/* Redis module loaded! Register it. */
//... 无关代码省略 ...
moduleFreeContext(&ctx);
return C_OK;
}
API 绑定
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (RedisModule_Init(ctx, "helloworld", 1, REDISMODULE_APIVER_1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
// ... 无关代码省略 ...
}
static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int apiver) {
void *getapifuncptr = ((void**)ctx)[0];
RedisModule_GetApi = (int (*)(const char *, void *)) (unsigned long)getapifuncptr;
// 绑定redis导出的api
REDISMODULE_GET_API(Alloc);
REDISMODULE_GET_API(TryAlloc);
REDISMODULE_GET_API(Calloc);
REDISMODULE_GET_API(Free);
REDISMODULE_GET_API(Realloc);
REDISMODULE_GET_API(Strdup);
REDISMODULE_GET_API(CreateCommand);
REDISMODULE_GET_API(GetCommand);
// ... 无关代码省略 ...
}
RedisModule_GetApi("RedisModule_"
void moduleCreateContext(RedisModuleCtx *out_ctx, RedisModule *module, int ctx_flags) {
memset(out_ctx, 0 ,sizeof(RedisModuleCtx));
// 这里把GetApi地址传递给module
out_ctx->getapifuncptr = (void*)(unsigned long)&RM_GetApi;
out_ctx->module = module;
out_ctx->flags = ctx_flags;
// ... 无关代码省略 ...
}
void *getapifuncptr = ((void**)ctx)[0];
RedisModule_GetApi = (int (*)(const char *, void *)) (unsigned long)getapifuncptr;
struct RedisModuleCtx {
// getapifuncptr是第一个成员
void *getapifuncptr; /* NOTE: Must be the first field. */
struct RedisModule *module; /* Module reference. */
client *client; /* Client calling a command. */
// ... 无关代码省略 ...
};
搞清楚了RM_GetApi是怎么被导出的原理后,我们来接着看下RM_GetApi内部在做什么:
int RM_GetApi(const char *funcname, void **targetPtrPtr) {
/* Lookup the requested module API and store the function pointer into the
* target pointer. The function returns REDISMODULE_ERR if there is no such
* named API, otherwise REDISMODULE_OK.
*
* This function is not meant to be used by modules developer, it is only
* used implicitly by including redismodule.h. */
dictEntry *he = dictFind(server.moduleapi, funcname);
if (!he) return REDISMODULE_ERR;
*targetPtrPtr = dictGetVal(he);
return REDISMODULE_OK;
}
/* Register all the APIs we export. Keep this function at the end of the
* file so that's easy to seek it to add new entries. */
void moduleRegisterCoreAPI(void) {
server.moduleapi = dictCreate(&moduleAPIDictType);
server.sharedapi = dictCreate(&moduleAPIDictType);
// 向全局哈希表中注册函数
REGISTER_API(Alloc);
REGISTER_API(TryAlloc);
REGISTER_API(Calloc);
REGISTER_API(Realloc);
REGISTER_API(Free);
REGISTER_API(Strdup);
REGISTER_API(CreateCommand);
// ... 无关代码省略 ...
}
int moduleRegisterApi(const char *funcname, void *funcptr) {
return dictAdd(server.moduleapi, (char*)funcname, funcptr);
}
moduleRegisterApi("RedisModule_"
一些最佳实践
入口函数禁用c++ mangle
extern "C" __attribute__((visibility("default"))) int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
// Init code and command register
return REDISMODULE_OK;
}
接管内存统计
REDISMODULE_API void * (*RedisModule_Alloc)(size_t bytes) REDISMODULE_ATTR;
REDISMODULE_API void * (*RedisModule_Realloc)(void *ptr, size_t bytes) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_Free)(void *ptr) REDISMODULE_ATTR;
REDISMODULE_API void * (*RedisModule_Calloc)(size_t nmemb, size_t size) REDISMODULE_ATTR;
new/operator new/placement new
1.分配空间(使用operator new)
2.初始化对象(使用placement new或者类型强转),即调用对象的构造函数
void * operator new(size_t, void *location) {
return location;
}
可见,要想实现修改new默认使用的内存分配,我们可以使用两种方式。
placement new
Object *p=(Object*)RedisModule_Alloc(sizeof(Object));
new (p)Object();
同时注意析构时也需要特殊处理:
p->~Object();
RedisModule_Free(p);
operator new
_GLIBCXX_WEAK_DEFINITION void *
operator new (std::size_t sz) _GLIBCXX_THROW (std::bad_alloc)
{
void *p;
/* malloc (0) is unpredictable; avoid it. */
if (sz == 0)
sz = 1;
while (__builtin_expect ((p = malloc (sz)) == 0, false))
{
new_handler handler = std::get_new_handler ();
if (! handler)
_GLIBCXX_THROW_OR_ABORT(bad_alloc());
handler ();
}
return p;
}
void *operator new(std::size_t size) {
return RedisModule_Alloc(size);
}
void operator delete(void *ptr) noexcept {
RedisModule_Free(ptr);
}
operator new在多个module之间的可见性
静态链接/动态链接c++标准库
静态链接
动态链接
使用block机制提高并发处理能力
图1 典型的异步处理模型
block虽然看上去很美好很强大,但是需要小心处理一些坑,如:
命令虽然异步执行了,但是写AOF和向备库复制依然同步做。如果提前写AOF并向备库复制,万一后面命令执行失败了就无法回滚; 因为备库是不允许执行block命令的,因此主库需要将block类型的命令rewrite成非block类型的命令复制给备库; 异步执行时,在open一个key时不能只看keyname,因为可能在异步线程执行之前,原来的key已经被删除了,然后又有一个同名的key被创建,即当前看到的key已经不是原来的key了; 设计好block类型的命令是否支持事务和lua; 如果采用线程池,需要注意相同key在线程池中的保序执行问题(即相同key的处理不能乱序);
避免和其他Module符号冲突
因为redis可以同时加载多个module,这些module可能来自不同的团队和个人,因此存在一定的概率,不同的module会定义相同的函数名。为了避免符号冲突导致的未定义行为,建议每个module都把除了Onload和Unload函数之外的符号都隐藏掉,可以在给编译器传递一些flag实现。如gcc:
-fvisibility=hidden
小心Fork陷阱
处理inflight状态的命令
如果module采用异步执行模型(参看前文block一节),那么当redis做aofrewrite或bgsave时,在redis fork子进程的瞬间,如果还有一些命令处于inflight状态,那么此时新产生的base aof或者rdb可能并不会包含这些inflight时的数据,虽然这个看上去也没有太大问题,因为inflight的命令最终完成时也会把命令写入增量的aof中。但是,为了和redis原来的行为兼容(即fork时一定没有处于inflight状态的命令,是一个静止的状态),module最好还是保证所有的inflight状态的命令都执行完了再执行fork。
在module中可以通过redis暴露的RedisModuleEvent_ForkChild事件,在fork执行之前执行一个我们传入的回调函数。
RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_ForkChild, waitAllInflightTaskFinish);
比如在waitAllInflightTaskFinish中等待队列为空(即所有task都执行结束):
static void waitAllInflightTaskFinish() {
while (!thread_pool->idle())
;
}
int pthread_atfork(void (*prepare)(void), void (*parent)void(), void (*child)(void));
避免死锁
我们知道通过fork创建的一个子进程几乎但不完全与父进程相同。子进程得到与父进程用户级虚拟地址空间相同的(但是独立的)一份拷贝,包括文本、数据和bss段、堆以及用户栈等。子进程还获得与父进程任何打开文件描述符相同的拷贝,这就意味着子进程可以读写父进程中任何打开的文件,父进程和子进程之间最大的区别在于它们有着不同的PID。
但是有一点需要注意的是,在Linux中,fork的时候只复制当前线程到子进程,在fork(2)-Linux Man Page中有着这样一段相关的描述:
The child process is created with a single thread--the one that called fork(). The entire virtual address space of the parent is replicated in the child, including the states of mutexes, condition variables, and other pthreads objects; the use of pthread_atfork(3) may be helpful for dealing with problems that this can cause.
也就是说除了调用fork的线程外,其他线程在子进程中“蒸发”了。
因此,如果在一些异步线程中持有了一些资源的锁,那么在子进程中,因为这些线程消失了,那么子进程可能会发生死锁的问题。
解决方法和解决inflight一样,保证在fork之前所有的锁都释放掉即可。(其实只要所有inflight状态的命令都执行完了,一般锁也就都释放了)
确保向备库复制的AOF保持语义幂等
Redis的主备复制首要目标就是保证主备的一致性。因此备库要做的就是无条件接收来自主库的复制内容,并严格保持一致。但是对于一些比较特殊的命令而言,需要小心处理。
以Tair暴露的Tair String为例,支持给数据设置版本号,比如用户写入:
EXSET key value VER 10
EXSET key value ABS 11
支持graceful shutdown
Module内部可能会启动一些异步线程或者管理一些异步资源,这些资源需要在redis shutdown时被处理(如停止、析构、写磁盘等),否则redis在退出时可能发生coredump。
在redis中,可以注册RedisModuleEvent_Shutdown事件实现,当redis关机时会回调我们传入的ShutdownCallback。
当然,在较新的redis版本中,module也可以通过暴露unload函数来实现类似的功能。
RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_Shutdown, ShutdownCallback);
实现aof文件压缩功能,如将一个hash的所有写操作重写为一条hmset命令(也可能是多条); 避免重写后的一条aof过大(如超过500MB),如果超过,则需要rewrite成多条cmd,同时需要确保这些多条cmd是否需要以事务的方式执行(即需要操作命令执行的隔离性); 对于一些复杂结构,无法简单重写为已有命令的module,可以单独实现一个“内部”命令,如xxxload/xxxdump等,用于实现对该module数据结构的序列化和反序列化,该命令不会对外暴露给客户端; RedisModule_EmitAOF中如果包含array类型的参数(即使用'v' flag传递的参数),则array的长度一定要使用size_t类型,否则可能会遇到诡异的错误;
RDB编码具有向后兼容能力
RDB是二进制格式的序列化和反序列化,因此相对而言比较简单。但是需要注意的是,如果数据结构以后的序列化方式可能会改变,则最好加上编解码的版本,这样在升级的时候可以保证兼容性,如下:
void *xxx_RdbLoad(RedisModuleIO *rdb, int encver) {
if (encver == version1 ) {
/* version1 format */
} else if (encver == version2 ){
/* version2 format */
}
}
一些命令实现的建议
参数检验:尽量在命令开始处对参数合法性(如参数个数是否正确、参数类型是否正确等)进行校验,尽量避免命令没有成功执行的情况下提前污染了keyspace(如提前使用了RedisModule_ModuleTypeSetValue修改主数据库) 错误信息:返回的错误信息应尽可能简单明了,阐明错误类型是什么 响应类型保持统一:注意命令在各种情况下的返回类型要统一,如key不存在、key类型错误、执行成功以及一些参数错误时的响应类型。通常情况下,除了返回错误类型之外,其他的所有情况都应该返回相同类型,如都返回一个简单字符串、或者都返回一个数组(哪怕是一个空数组)。这样客户端在解析命令返回值时比较方便 确认读写类型:命令应严格区分读写类型,这涉及到该命令能否在replica上执行、以及该命令是否需要进行同步、写aof等 复制幂等性和AOF:对于写命令,需要自行使用RedisModule_ReplicateVerbatim或者RedisModule_Replicate进行主备复制和写AOF(必要的时候需要对原命令进程重写)。其中,使用RedisModule_Replicate产生的AOF,前后都会被自动加上multi/exec(保证module内产生的命令具有隔离性)。因此,推荐优先使用RedisModule_ReplicateVerbatim进行复制和写AOF。但是,如果命令中存在诸如版本号等参数,则必须使用RedisModule_Replicate将版本号重写为绝对版本号,将过期时间重写为绝对过期时间。另外,如果一个命令最终RedisModule_Replicate对命令进行重写,则需要保证重写后的命令不会再次发生重写。 复用argv参数:命令传入的argv中的参数类型为RedisModuleString ** ,这些RedisModuleString在命令返回后会被自动Free掉,因此命令中不应该直接引用这些RedisModuleString指针,如果非要这么做(如避免内存拷贝),可以使用RedisModule_RetainString/RedisModule_HoldString增加该RedisModuleString的引用计数,但是之后一定要记得自己手动Free key打开方式:在使用RedisModule_OpenKey打开一个key的时候,要严格区分打开的类型:REDISMODULE_READ、REDISMODULE_WRITE,因为这影响着是否更新内部的stat_keyspace_misses和stat_keyspace_hits信息,还影响的了过期再写入的问题。同时,使用REDISMODULE_READ方式打开的key不能被删除,否则报错 key类型处理:目前只有string的set命令可以强行覆盖其他类型的key,其他的命令在遇到key存在但类型不匹配时需要返回""WRONGTYPE Operation against a key holding the wrong kind of value"错误 多key命令的cluster支持:对于多key的命令,一定要处理好firstkey、lastkey、keystep这三个值,因为只有这三个值对了,在cluster模式下,redis才会去检查这些key是否存在CROSS SLOTS的问题 全局索引、结构:module中如果有自己维护的全局索引,需要谨慎索引中是否包含dbid、key等信息,因为redis的move、rename、swapdb等命令会“偷梁换柱”式的更换key的名字、交换两个dbid,因此此时如果索引没有同步更新,将得到意想不到的错误
根据角色来确定动作:module本身运行的redis可能是一个主也可能是一个备,module内部可以使用RedisModule_GetContextFlags来判断当前redis的角色,并根据不同的角色来采取不同的行为(如是否进行主动过期处理等)
总结
Tair当前支持了非常多的扩展数据结构(其中redis 5.x企业版使用module方式,Tair自研企业版 6.x使用builtin方式),基本涵盖了各种应用场景(具体见介绍文档),其中既有像TairString和TairHash等小而美的数据结构(已经开源),也有像Tair Search和Vector等更为复杂和强大的计算型数据结构,充分满足AIGC背景下各种业务场景,欢迎使用。
介绍文档:https://help.aliyun.com/zh/redis/developer-reference/extended-data-structures-of-apsaradb-for-redis-enhanced-edition
fork(2)-Linux Man Page:http://linux.die.net/man/2/fork
阿里云开发者社区,千万开发者的选择
阿里云开发者社区,百万精品技术内容、千节免费系统课程、丰富的体验场景、活跃的社群活动、行业专家分享交流,欢迎点击【阅读原文】加入我们。
微信扫码关注该文公众号作者