raxGenericInsert 函数
该函数原型如下,用来向 Radix Tree 中插入一个长度为 len 的字符串 s。
/* Insert the element 's' of size 'len', setting as auxiliary data * the pointer 'data'. If the element is already present, the associated * data is updated (only if 'overwrite' is set to 1), and 0 is returned, * otherwise the element is inserted and 1 is returned. On out of memory the * function returns 0 as well but sets errno to ENOMEM, otherwise errno will * be set to 0. */ /* 插入大小为“len”的元素“s”,设置为辅助数据指针“data”。 - 如果元素已存在,则关联的更新数据(仅当“overwrite”设置为1时),并返回0. - 否则将插入元素并返回1。 - 内存不足函数也返回0,但将errno设置为ENOMEM, - 否则errno将设置为0。 */ int raxGenericInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old, int overwrite) { size_t i; int j = 0; /* Split position. If raxLowWalk() stops in a compressed node, the index 'j' represents the char we stopped within the compressed node, that is, the position where to split the node for insertion. */ raxNode *h, **parentlink; debugf("### Insert %.*s with value %p\n", (int)len, s, data); i = raxLowWalk(rax,s,len,&h,&parentlink,&j,NULL); /* If i == len we walked following the whole string. If we are not * in the middle of a compressed node, the string is either already * inserted or this middle node is currently not a key, but can represent * our key. We have just to reallocate the node and make space for the * data pointer. */ if (i == len && (!h->iscompr || j == 0 /* not in the middle if j is 0 */)) { debugf("### Insert: node representing key exists\n"); /* Make space for the value pointer if needed. */ if (!h->iskey || (h->isnull && overwrite)) { h = raxReallocForData(h,data); if (h) memcpy(parentlink,&h,sizeof(h)); } if (h == NULL) { errno = ENOMEM; return 0; } /* Update the existing key if there is already one. */ if (h->iskey) { if (old) *old = raxGetData(h); if (overwrite) raxSetData(h,data); errno = 0; return 0; /* Element already exists. */ } /* Otherwise set the node as a key. Note that raxSetData() * will set h->iskey. */ raxSetData(h,data); rax->numele++; return 1; /* Element inserted. */ } /* If the node we stopped at is a compressed node, we need to * split it before to continue. * * Splitting a compressed node have a few possible cases. * Imagine that the node 'h' we are currently at is a compressed * node containing the string "ANNIBALE" (it means that it represents * nodes A -> N -> N -> I -> B -> A -> L -> E with the only child * pointer of this node pointing at the 'E' node, because remember that * we have characters at the edges of the graph, not inside the nodes * themselves. * * In order to show a real case imagine our node to also point to * another compressed node, that finally points at the node without * children, representing 'O': * * "ANNIBALE" -> "SCO" -> [] * * When inserting we may face the following cases. Note that all the cases * require the insertion of a non compressed node with exactly two * children, except for the last case which just requires splitting a * compressed node. * * 1) Inserting "ANNIENTARE" * * |B| -> "ALE" -> "SCO" -> [] * "ANNI" -> |-| * |E| -> (... continue algo ...) "NTARE" -> [] * * 2) Inserting "ANNIBALI" * * |E| -> "SCO" -> [] * "ANNIBAL" -> |-| * |I| -> (... continue algo ...) [] * * 3) Inserting "AGO" (Like case 1, but set iscompr = 0 into original node) * * |N| -> "NIBALE" -> "SCO" -> [] * |A| -> |-| * |G| -> (... continue algo ...) |O| -> [] * * 4) Inserting "CIAO" * * |A| -> "NNIBALE" -> "SCO" -> [] * |-| * |C| -> (... continue algo ...) "IAO" -> [] * * 5) Inserting "ANNI" * * "ANNI" -> "BALE" -> "SCO" -> [] * * The final algorithm for insertion covering all the above cases is as * follows. * * ============================= ALGO 1 ============================= * * For the above cases 1 to 4, that is, all cases where we stopped in * the middle of a compressed node for a character mismatch, do: * * Let $SPLITPOS be the zero-based index at which, in the * compressed node array of characters, we found the mismatching * character. For example if the node contains "ANNIBALE" and we add * "ANNIENTARE" the $SPLITPOS is 4, that is, the index at which the * mismatching character is found. * * 1. Save the current compressed node $NEXT pointer (the pointer to the * child element, that is always present in compressed nodes). * * 2. Create "split node" having as child the non common letter * at the compressed node. The other non common letter (at the key) * will be added later as we continue the normal insertion algorithm * at step "6". * * 3a. IF $SPLITPOS == 0: * Replace the old node with the split node, by copying the auxiliary * data if any. Fix parent's reference. Free old node eventually * (we still need its data for the next steps of the algorithm). * * 3b. IF $SPLITPOS != 0: * Trim the compressed node (reallocating it as well) in order to * contain $splitpos characters. Change child pointer in order to link * to the split node. If new compressed node len is just 1, set * iscompr to 0 (layout is the same). Fix parent's reference. * * 4a. IF the postfix len (the length of the remaining string of the * original compressed node after the split character) is non zero, * create a "postfix node". If the postfix node has just one character * set iscompr to 0, otherwise iscompr to 1. Set the postfix node * child pointer to $NEXT. * * 4b. IF the postfix len is zero, just use $NEXT as postfix pointer. * * 5. Set child[0] of split node to postfix node. * * 6. Set the split node as the current node, set current index at child[1] * and continue insertion algorithm as usually. * * ============================= ALGO 2 ============================= * * For case 5, that is, if we stopped in the middle of a compressed * node but no mismatch was found, do: * * Let $SPLITPOS be the zero-based index at which, in the * compressed node array of characters, we stopped iterating because * there were no more keys character to match. So in the example of * the node "ANNIBALE", addig the string "ANNI", the $SPLITPOS is 4. * * 1. Save the current compressed node $NEXT pointer (the pointer to the * child element, that is always present in compressed nodes). * * 2. Create a "postfix node" containing all the characters from $SPLITPOS * to the end. Use $NEXT as the postfix node child pointer. * If the postfix node length is 1, set iscompr to 0. * Set the node as a key with the associated value of the new * inserted key. * * 3. Trim the current node to contain the first $SPLITPOS characters. * As usually if the new node length is just 1, set iscompr to 0. * Take the iskey / associated value as it was in the orignal node. * Fix the parent's reference. * * 4. Set the postfix node as the only child pointer of the trimmed * node created at step 1. */ /* ------------------------- ALGORITHM 1 --------------------------- */ if (h->iscompr && i != len) { debugf("ALGO 1: Stopped at compressed node %.*s (%p)\n", h->size, h->data, (void*)h); debugf("Still to insert: %.*s\n", (int)(len-i), s+i); debugf("Splitting at %d: '%c'\n", j, ((char*)h->data)[j]); debugf("Other (key) letter is '%c'\n", s[i]); /* 1: Save next pointer. */ raxNode **childfield = raxNodeLastChildPtr(h); raxNode *next; memcpy(&next,childfield,sizeof(next)); debugf("Next is %p\n", (void*)next); debugf("iskey %d\n", h->iskey); if (h->iskey) { debugf("key value is %p\n", raxGetData(h)); } /* Set the length of the additional nodes we will need. */ size_t trimmedlen = j; size_t postfixlen = h->size - j - 1; int split_node_is_key = !trimmedlen && h->iskey && !h->isnull; size_t nodesize; /* 2: Create the split node. Also allocate the other nodes we'll need * ASAP, so that it will be simpler to handle OOM. */ raxNode *splitnode = raxNewNode(1, split_node_is_key); raxNode *trimmed = NULL; raxNode *postfix = NULL; if (trimmedlen) { nodesize = sizeof(raxNode)+trimmedlen+raxPadding(trimmedlen)+ sizeof(raxNode*); if (h->iskey && !h->isnull) nodesize += sizeof(void*); trimmed = rax_malloc(nodesize); } if (postfixlen) { nodesize = sizeof(raxNode)+postfixlen+raxPadding(postfixlen)+ sizeof(raxNode*); postfix = rax_malloc(nodesize); } /* OOM? Abort now that the tree is untouched. */ if (splitnode == NULL || (trimmedlen && trimmed == NULL) || (postfixlen && postfix == NULL)) { rax_free(splitnode); rax_free(trimmed); rax_free(postfix); errno = ENOMEM; return 0; } splitnode->data[0] = h->data[j]; if (j == 0) { /* 3a: Replace the old node with the split node. */ if (h->iskey) { void *ndata = raxGetData(h); raxSetData(splitnode,ndata); } memcpy(parentlink,&splitnode,sizeof(splitnode)); } else { /* 3b: Trim the compressed node. */ trimmed->size = j; memcpy(trimmed->data,h->data,j); trimmed->iscompr = j > 1 ? 1 : 0; trimmed->iskey = h->iskey; trimmed->isnull = h->isnull; if (h->iskey && !h->isnull) { void *ndata = raxGetData(h); raxSetData(trimmed,ndata); } raxNode **cp = raxNodeLastChildPtr(trimmed); memcpy(cp,&splitnode,sizeof(splitnode)); memcpy(parentlink,&trimmed,sizeof(trimmed)); parentlink = cp; /* Set parentlink to splitnode parent. */ rax->numnodes++; } /* 4: Create the postfix node: what remains of the original * compressed node after the split. */ if (postfixlen) { /* 4a: create a postfix node. */ postfix->iskey = 0; postfix->isnull = 0; postfix->size = postfixlen; postfix->iscompr = postfixlen > 1; memcpy(postfix->data,h->data+j+1,postfixlen); raxNode **cp = raxNodeLastChildPtr(postfix); memcpy(cp,&next,sizeof(next)); rax->numnodes++; } else { /* 4b: just use next as postfix node. */ postfix = next; } /* 5: Set splitnode first child as the postfix node. */ raxNode **splitchild = raxNodeLastChildPtr(splitnode); memcpy(splitchild,&postfix,sizeof(postfix)); /* 6. Continue insertion: this will cause the splitnode to * get a new child (the non common character at the currently * inserted key). */ rax_free(h); h = splitnode; } else if (h->iscompr && i == len) { /* ------------------------- ALGORITHM 2 --------------------------- */ debugf("ALGO 2: Stopped at compressed node %.*s (%p) j = %d\n", h->size, h->data, (void*)h, j); /* Allocate postfix & trimmed nodes ASAP to fail for OOM gracefully. */ size_t postfixlen = h->size - j; size_t nodesize = sizeof(raxNode)+postfixlen+raxPadding(postfixlen)+ sizeof(raxNode*); if (data != NULL) nodesize += sizeof(void*); raxNode *postfix = rax_malloc(nodesize); nodesize = sizeof(raxNode)+j+raxPadding(j)+sizeof(raxNode*); if (h->iskey && !h->isnull) nodesize += sizeof(void*); raxNode *trimmed = rax_malloc(nodesize); if (postfix == NULL || trimmed == NULL) { rax_free(postfix); rax_free(trimmed); errno = ENOMEM; return 0; } /* 1: Save next pointer. */ raxNode **childfield = raxNodeLastChildPtr(h); raxNode *next; memcpy(&next,childfield,sizeof(next)); /* 2: Create the postfix node. */ postfix->size = postfixlen; postfix->iscompr = postfixlen > 1; postfix->iskey = 1; postfix->isnull = 0; memcpy(postfix->data,h->data+j,postfixlen); raxSetData(postfix,data); raxNode **cp = raxNodeLastChildPtr(postfix); memcpy(cp,&next,sizeof(next)); rax->numnodes++; /* 3: Trim the compressed node. */ trimmed->size = j; trimmed->iscompr = j > 1; trimmed->iskey = 0; trimmed->isnull = 0; memcpy(trimmed->data,h->data,j); memcpy(parentlink,&trimmed,sizeof(trimmed)); if (h->iskey) { void *aux = raxGetData(h); raxSetData(trimmed,aux); } /* Fix the trimmed node child pointer to point to * the postfix node. */ cp = raxNodeLastChildPtr(trimmed); memcpy(cp,&postfix,sizeof(postfix)); /* Finish! We don't need to continue with the insertion * algorithm for ALGO 2. The key is already inserted. */ rax->numele++; rax_free(h); return 1; /* Key inserted. */ } /* We walked the radix tree as far as we could, but still there are left * chars in our string. We need to insert the missing nodes. */ while(i < len) { raxNode *child; /* If this node is going to have a single child, and there * are other characters, so that that would result in a chain * of single-childed nodes, turn it into a compressed node. */ if (h->size == 0 && len-i > 1) { debugf("Inserting compressed node\n"); size_t comprsize = len-i; if (comprsize > RAX_NODE_MAX_SIZE) comprsize = RAX_NODE_MAX_SIZE; raxNode *newh = raxCompressNode(h,s+i,comprsize,&child); if (newh == NULL) goto oom; h = newh; memcpy(parentlink,&h,sizeof(h)); parentlink = raxNodeLastChildPtr(h); i += comprsize; } else { debugf("Inserting normal node\n"); raxNode **new_parentlink; raxNode *newh = raxAddChild(h,s[i],&child,&new_parentlink); if (newh == NULL) goto oom; h = newh; memcpy(parentlink,&h,sizeof(h)); parentlink = new_parentlink; i++; } rax->numnodes++; h = child; } raxNode *newh = raxReallocForData(h,data); if (newh == NULL) goto oom; h = newh; if (!h->iskey) rax->numele++; raxSetData(h,data); memcpy(parentlink,&h,sizeof(h)); return 1; /* Element inserted. */ oom: /* This code path handles out of memory after part of the sub-tree was * already modified. Set the node as a key, and then remove it. However we * do that only if the node is a terminal node, otherwise if the OOM * happened reallocating a node in the middle, we don't need to free * anything. */ if (h->size == 0) { h->isnull = 1; h->iskey = 1; rax->numele++; /* Compensate the next remove. */ assert(raxRemove(rax,s,i,NULL) != 0); } errno = ENOMEM; return 0; }
raxLowWalk 函数
该函数原型如下,当需要在 Radix Tree 中查找、插入或是删除节点时,都会调用该函数。
/* Low level function that walks the tree looking for the string * 's' of 'len' bytes. The function returns the number of characters * of the key that was possible to process: if the returned integer * is the same as 'len', then it means that the node corresponding to the * string was found (however it may not be a key in case the node->iskey is * zero or if simply we stopped in the middle of a compressed node, so that * 'splitpos' is non zero). * * Otherwise if the returned integer is not the same as 'len', there was an * early stop during the tree walk because of a character mismatch. * * The node where the search ended (because the full string was processed * or because there was an early stop) is returned by reference as * '*stopnode' if the passed pointer is not NULL. This node link in the * parent's node is returned as '*plink' if not NULL. Finally, if the * search stopped in a compressed node, '*splitpos' returns the index * inside the compressed node where the search ended. This is useful to * know where to split the node for insertion. * * Note that when we stop in the middle of a compressed node with * a perfect match, this function will return a length equal to the * 'len' argument (all the key matched), and will return a *splitpos which is * always positive (that will represent the index of the character immediately * *after* the last match in the current compressed node). * * When instead we stop at a compressed node and *splitpos is zero, it * means that the current node represents the key (that is, none of the * compressed node characters are needed to represent the key, just all * its parents nodes). */ static inline size_t raxLowWalk(rax *rax, unsigned char *s, size_t len, raxNode **stopnode, raxNode ***plink, int *splitpos, raxStack *ts) { raxNode *h = rax->head; raxNode **parentlink = &rax->head; size_t i = 0; /* Position in the string. */ size_t j = 0; /* Position in the node children (or bytes if compressed).*/ while(h->size && i < len) { debugnode("Lookup current node",h); unsigned char *v = h->data; if (h->iscompr) { for (j = 0; j < h->size && i < len; j++, i++) { if (v[j] != s[i]) break; } if (j != h->size) break; } else { /* Even when h->size is large, linear scan provides good * performances compared to other approaches that are in theory * more sounding, like performing a binary search. */ for (j = 0; j < h->size; j++) { if (v[j] == s[i]) break; } if (j == h->size) break; i++; } if (ts) raxStackPush(ts,h); /* Save stack of parent nodes. */ raxNode **children = raxNodeFirstChildPtr(h); if (h->iscompr) j = 0; /* Compressed node only child is at index 0. */ memcpy(&h,children+j,sizeof(h)); parentlink = children+j; j = 0; /* If the new node is non compressed and we do not iterate again (since i == len) set the split position to 0 to signal this node represents the searched key. */ } debugnode("Lookup stop node is",h); if (stopnode) *stopnode = h; if (plink) *plink = parentlink; if (splitpos && h->iscompr) *splitpos = j; return i; }
raxGetData/raxSetData 函数
这两个函数的原型如下所示,它们分别用来获得 raxNode 中保存的 value 指针,以及设置 raxNode 中保存的 value 指针。
- raxGetData
/* Get the node auxiliary data. */ void *raxGetData(raxNode *n) { if (n->isnull) return NULL; void **ndata =(void**)((char*)n+raxNodeCurrentLength(n)-sizeof(void*)); void *data; memcpy(&data,ndata,sizeof(data)); return data; }
- raxSetData
/* Set the node auxiliary data to the specified pointer. */ /* 将节点辅助数据设置为指定的指针 */ void raxSetData(raxNode *n, void *data) { n->iskey = 1; if (data != NULL) { n->isnull = 0; void **ndata = (void**) ((char*)n+raxNodeCurrentLength(n)-sizeof(void*)); memcpy(ndata,&data,sizeof(data)); } else { n->isnull = 1; } }
好了,了解了 Radix Tree 的基本操作函数后,我们最后再来看下,Stream 是如何把 Radix Tree 和 listpack 组合起来使用的。
Stream 如何组合使用 Radix Tree 和 listpack?
我们知道,Stream 保存的消息数据,按照 key-value 形式来看的话,消息 ID 就相当于 key,而消息内容相当于是 value。也就是说,Stream 会使用 Radix Tree 来保存消息 ID,然后将消息内容保存在 listpack 中,并作为消息 ID 的 value,用 raxNode 的 value 指针指向对应的 listpack。
这里我放了一张图,展示了 Stream 结构、rax、raxNode 以及 listpack 相互之间的关系。注意,在这张图中,我们假设就只有一个 streamID 作为 key。
我们可以看到,stream 结构体中的 rax 指针,指向了 Radix Tree 的头节点,也就是 rax 结构体。rax 结构体中的头指针进一步指向了第一个 raxNode。因为我们假设就只有一个 streamID,暂时没有其他 streamID 和该 streamID 共享前缀,所以,当前这个 streamID 就可以用压缩节点保存。
然后,第一个 raxNode 指向了下一个 raxNode,也是 Radix Tree 的叶子节点。这个节点的 size 为 0,它的 value 指针指向了实际的消息内容。
而在消息内容这里,是使用了 listpack 进行保存的。你可以看到,listpack 中是使用了 master entry 来保存键值对类型消息中的键,而值会在 master entry 后面保存。这种保存方式其实也是为了节省内存空间,这是因为很多消息的键是相同的,保存一份就行。关于在 Stream 中,将消息的键和值分开保存到 listpack 中的这种设计方法,我会在后面的课程中继续给你详细介绍。
另外,为了方便你更好地掌握非压缩节点和压缩节点,我再给你总结下它们的相同之处和区别,你也可以来整体回顾下。
它们的相同之处在于:
- 都有保存元数据的节点头 HDR;都会包含指向子节点的指针,以及子节点所代表的字符串。
- 从根节点到当前节点路径上的字符串如果是 Radix Tree 的一个 key,它们都会包含指向 key 对应 value 的指针。
不同之处在于:
- 非压缩节点指向的子节点,每个子节点代表一个字符,非压缩节点可以指向多个子节点;
- 压缩节点指向的子节点,代表的是一个合并字符串,压缩节点只能指向一个子节点。
而除了学习 raxNode,我还给你介绍了下 Radix Tree 中几个基本操作函数的作用,并展示了 Stream 类型是如何把消息 ID 和消息内容,分别保存在 Radix Tree 和 listpack 中的。这里你要注意的是,因为 Radix Tree 在保存具有公共前缀的数据时,能有效节省内存开销。同时,Radix Tree 本身也是有序的树型索引,可以支持单点和范围查询。所以,Redis 把消息 ID 保存在 Radix Tree 中,既可以节省内存空间,也能高效支持消息 ID 的查询。而 listpack 本身是紧凑列表,在保存大量消息内容的同时,也能有效节省内存。所以我希望,你能通过 Stream 对 Radix Tree 和 listpack 的使用,举一反三,把它们用在相应的消息存取或是大量字符串存取的场景中。
Radix Tree 优劣势
作为有序索引,Radix Tree 也能提供范围查询,和我们日常使用的 B+ 树,以及第5讲中介绍的跳表相比,你觉得 Radix Tree 有什么优势和不足么?
1、Radix Tree 优势
- 本质上是前缀树,所以存储有「公共前缀」的数据时,比 B+ 树、跳表节省内存
- 没有公共前缀的数据项,压缩存储,value 用 listpack 存储,也可以节省内存
- 查询复杂度是 O(K),只与「目标长度」有关,与总数据量无关
- 这种数据结构也经常用在搜索引擎提示、文字自动补全等场景
Stream 在存消息时,推荐使用默认自动生成的「时间戳+序号」作为消息 ID,不建议自己指定消息 ID,这样才能发挥 Radix Tree 公共前缀的优势。
2、Radix Tree 不足
- 如果数据集公共前缀较少,会导致内存占用多
- 增删节点需要处理其它节点的「分裂、合并」,跳表只需调整前后指针即可
- B+ 树、跳表范围查询友好,直接遍历链表即可,Radix Tree 需遍历树结构
- 实现难度高比 B+ 树、跳表复杂
每种数据结构都是在面对不同问题场景下,才被设计出来的,结合各自场景中的数据特点,使用优势最大的数据结构才是正解。