source: mainline/kernel/generic/src/adt/cht.c@ 14c9aa6

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since 14c9aa6 was 14c9aa6, checked in by Adam Hraska <adam.hraska+hos@…>, 13 years ago

cht: Added initial working concurrent hash table. Builds and runs.

  • Property mode set to 100644
File size: 53.6 KB
Line 
1/*
2 * Copyright (c) 2012 Adam Hraska
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29
30/** @addtogroup genericadt
31 * @{
32 */
33
34/**
35 * @file
36 * @brief Scalable resizable concurrent lock-free hash table.
37 *
38 */
39
40#include <adt/cht.h>
41#include <adt/hash.h>
42#include <debug.h>
43#include <memstr.h>
44#include <mm/slab.h>
45#include <arch/barrier.h>
46#include <compiler/barrier.h>
47#include <atomic.h>
48#include <synch/rcu.h>
49
50
51/* Logarithm of the min bucket count. 2^6 == 64 buckets. */
52#define CHT_MIN_ORDER 6
53/* Logarithm of the max bucket count. */
54#define CHT_MAX_ORDER (8 * sizeof(size_t))
55/* Minimum number of hash table buckets. */
56#define CHT_MIN_BUCKET_CNT (1 << CHT_MIN_ORDER)
57/* Must be a power of 2. */
58#define CHT_MAX_LOAD 2
59
60typedef cht_ptr_t marked_ptr_t;
61typedef bool (*equal_pred_t)(void *arg, const cht_link_t *item);
62
63typedef enum mark {
64 N_NORMAL = 0,
65 N_DELETED = 1,
66 N_CONST = 1,
67 N_INVALID = 3,
68 N_JOIN = 2,
69 N_JOIN_FOLLOWS = 2,
70 N_MARK_MASK = 3
71} mark_t;
72
73typedef enum walk_mode {
74 WM_NORMAL = 4,
75 WM_LEAVE_JOIN,
76 WM_MOVE_JOIN_FOLLOWS
77} walk_mode_t;
78
79typedef struct wnd {
80 marked_ptr_t *ppred;
81 cht_link_t *cur;
82 cht_link_t *last;
83} wnd_t;
84
85/* Fwd decl. */
86static size_t size_to_order(size_t bucket_cnt, size_t min_order);
87static cht_buckets_t *alloc_buckets(size_t order, bool set_invalid);
88static cht_link_t *search_bucket(cht_t *h, marked_ptr_t head, void *key,
89 size_t search_hash);
90static cht_link_t *find_resizing(cht_t *h, void *key, size_t hash,
91 marked_ptr_t old_head, size_t old_idx);
92static bool insert_impl(cht_t *h, cht_link_t *item, bool unique);
93static bool insert_at(cht_link_t *item, const wnd_t *wnd, walk_mode_t walk_mode,
94 bool *resizing);
95static bool has_duplicates(cht_t *h, const cht_link_t *item, size_t hash,
96 const wnd_t *cwnd);
97static cht_link_t *find_duplicate(cht_t *h, const cht_link_t *item, size_t hash,
98 cht_link_t *start);
99static bool remove_pred(cht_t *h, size_t hash, equal_pred_t pred, void *pred_arg);
100static bool delete_at(cht_t *h, wnd_t *wnd, walk_mode_t walk_mode,
101 bool *deleted_but_gc, bool *resizing);
102static bool mark_deleted(cht_link_t *cur, walk_mode_t walk_mode, bool *resizing);
103static bool unlink_from_pred(wnd_t *wnd, walk_mode_t walk_mode, bool *resizing);
104static bool find_wnd_and_gc_pred(cht_t *h, size_t hash, walk_mode_t walk_mode,
105 equal_pred_t pred, void *pred_arg, wnd_t *wnd, bool *resizing);
106static bool find_wnd_and_gc(cht_t *h, size_t hash, walk_mode_t walk_mode,
107 wnd_t *wnd, bool *resizing);
108static bool gc_deleted_node(cht_t *h, walk_mode_t walk_mode, wnd_t *wnd,
109 bool *resizing);
110static bool join_completed(cht_t *h, const wnd_t *wnd);
111static void upd_resizing_head(cht_t *h, size_t hash, marked_ptr_t **phead,
112 bool *join_finishing, walk_mode_t *walk_mode);
113static void item_removed(cht_t *h);
114static void item_inserted(cht_t *h);
115static void free_later(cht_t *h, cht_link_t *item);
116static void help_head_move(marked_ptr_t *psrc_head, marked_ptr_t *pdest_head);
117static void start_head_move(marked_ptr_t *psrc_head);
118static void mark_const(marked_ptr_t *psrc_head);
119static void complete_head_move(marked_ptr_t *psrc_head, marked_ptr_t *pdest_head);
120static void split_bucket(cht_t *h, marked_ptr_t *psrc_head,
121 marked_ptr_t *pdest_head, size_t split_hash);
122static void mark_join_follows(cht_t *h, marked_ptr_t *psrc_head,
123 size_t split_hash, wnd_t *wnd);
124static void mark_join_node(cht_link_t *join_node);
125static void join_buckets(cht_t *h, marked_ptr_t *psrc_head,
126 marked_ptr_t *pdest_head, size_t split_hash);
127static void link_to_join_node(cht_t *h, marked_ptr_t *pdest_head,
128 cht_link_t *join_node, size_t split_hash);
129static void resize_table(work_t *arg);
130static void grow_table(cht_t *h);
131static void shrink_table(cht_t *h);
132static void cleanup_join_node(cht_t *h, marked_ptr_t *new_head);
133static void clear_join_and_gc(cht_t *h, cht_link_t *join_node,
134 marked_ptr_t *new_head);
135static void cleanup_join_follows(cht_t *h, marked_ptr_t *new_head);
136static marked_ptr_t make_link(const cht_link_t *next, mark_t mark);
137static cht_link_t * get_next(marked_ptr_t link);
138static mark_t get_mark(marked_ptr_t link);
139static void next_wnd(wnd_t *wnd);
140static bool same_node_pred(void *node, const cht_link_t *item2);
141static size_t key_hash(cht_t *h, void *key);
142static size_t node_hash(cht_t *h, const cht_link_t *item);
143static size_t calc_split_hash(size_t split_idx, size_t order);
144static size_t calc_bucket_idx(size_t hash, size_t order);
145static size_t grow_to_split_idx(size_t old_idx);
146static size_t grow_idx(size_t idx);
147static size_t shrink_idx(size_t idx);
148static marked_ptr_t cas_link(marked_ptr_t *link, const cht_link_t *cur_next,
149 mark_t cur_mark, const cht_link_t *new_next, mark_t new_mark);
150static marked_ptr_t _cas_link(marked_ptr_t *link, marked_ptr_t cur,
151 marked_ptr_t new);
152static void cas_order_barrier(void);
153
154
155bool cht_create(cht_t *h, size_t init_size, size_t min_size, cht_ops_t *op)
156{
157 ASSERT(h);
158 ASSERT(op && op->hash && op->key_hash && op->equal && op->key_equal);
159
160 /* All operations are compulsory. */
161 if (!op || !op->hash || !op->key_hash || !op->equal || !op->key_equal)
162 return false;
163
164 size_t min_order = size_to_order(min_size, CHT_MIN_ORDER);
165 size_t order = size_to_order(init_size, min_order);
166
167 h->b = alloc_buckets(order, false);
168
169 if (!h->b)
170 return false;
171
172 h->min_order = min_order;
173 h->new_b = 0;
174 h->op = op;
175 atomic_set(&h->item_cnt, 0);
176 atomic_set(&h->resize_reqs, 0);
177 /* Ensure the initialization takes place before we start using the table. */
178 write_barrier();
179
180 return true;
181}
182
183static cht_buckets_t *alloc_buckets(size_t order, bool set_invalid)
184{
185 size_t bucket_cnt = (1 << order);
186 size_t bytes =
187 sizeof(cht_buckets_t) + (bucket_cnt - 1) * sizeof(marked_ptr_t);
188 cht_buckets_t *b = malloc(bytes, FRAME_ATOMIC);
189
190 if (!b)
191 return 0;
192
193 b->order = order;
194
195 marked_ptr_t head_link
196 = set_invalid ? make_link(0, N_INVALID) : make_link(0, N_NORMAL);
197
198 for (size_t i = 0; i < bucket_cnt; ++i) {
199 b->head[i] = head_link;
200 }
201
202 return b;
203}
204
205static size_t size_to_order(size_t bucket_cnt, size_t min_order)
206{
207 size_t order = min_order;
208
209 /* Find a power of two such that bucket_cnt <= 2^order */
210 do {
211 if (bucket_cnt <= ((size_t)1 << order))
212 return order;
213
214 ++order;
215 } while (order < CHT_MAX_ORDER);
216
217 return order;
218}
219
220
221void cht_destroy(cht_t *h)
222{
223 /* Wait for resize to complete. */
224 while (0 < atomic_get(&h->resize_reqs)) {
225 rcu_barrier();
226 }
227
228 /* Wait for all remove_callback()s to complete. */
229 rcu_barrier();
230
231 free(h->b);
232 h->b = 0;
233}
234
235cht_link_t *cht_find(cht_t *h, void *key)
236{
237 /* Make the most recent changes to the table visible. */
238 read_barrier();
239 return cht_find_lazy(h, key);
240}
241
242
243cht_link_t *cht_find_lazy(cht_t *h, void *key)
244{
245 ASSERT(h);
246 ASSERT(rcu_read_locked());
247
248 size_t hash = key_hash(h, key);
249
250 cht_buckets_t *b = rcu_access(h->b);
251 size_t idx = calc_bucket_idx(hash, b->order);
252 /*
253 * No need for access_once. b->head[idx] will point to an allocated node
254 * even if marked invalid until we exit rcu read section.
255 */
256 marked_ptr_t head = b->head[idx];
257
258 if (N_INVALID == get_mark(head))
259 return find_resizing(h, key, hash, head, idx);
260
261 return search_bucket(h, head, key, hash);
262}
263
264cht_link_t *cht_find_next(cht_t *h, const cht_link_t *item)
265{
266 /* Make the most recent changes to the table visible. */
267 read_barrier();
268 return cht_find_next_lazy(h, item);
269}
270
271cht_link_t *cht_find_next_lazy(cht_t *h, const cht_link_t *item)
272{
273 ASSERT(h);
274 ASSERT(rcu_read_locked());
275 ASSERT(item);
276
277 return find_duplicate(h, item, node_hash(h, item), get_next(item->link));
278}
279
280static cht_link_t *search_bucket(cht_t *h, marked_ptr_t head, void *key,
281 size_t search_hash)
282{
283 cht_link_t *cur = get_next(head);
284
285 while (cur) {
286 /*
287 * It is safe to access nodes even outside of this bucket (eg when
288 * splitting the bucket). The resizer makes sure that any node we
289 * may find by following the next pointers is allocated.
290 */
291 size_t cur_hash = node_hash(h, cur);
292
293 if (cur_hash >= search_hash) {
294 if (cur_hash != search_hash)
295 return 0;
296
297 int present = !(N_DELETED & get_mark(cur->link));
298 if (present && h->op->key_equal(key, cur))
299 return cur;
300 }
301
302 cur = get_next(cur->link);
303 }
304
305 return 0;
306}
307
308static cht_link_t *find_resizing(cht_t *h, void *key, size_t hash,
309 marked_ptr_t old_head, size_t old_idx)
310{
311 ASSERT(N_INVALID == get_mark(old_head));
312 ASSERT(h->new_b);
313
314 size_t new_idx = calc_bucket_idx(hash, h->new_b->order);
315 marked_ptr_t new_head = h->new_b->head[new_idx];
316 marked_ptr_t search_head = new_head;
317
318 /* Growing. */
319 if (h->b->order < h->new_b->order) {
320 /*
321 * Old bucket head is invalid, so it must have been already
322 * moved. Make the new head visible if still not visible, ie
323 * invalid.
324 */
325 if (N_INVALID == get_mark(new_head)) {
326 /*
327 * We should be searching a newly added bucket but the old
328 * moved bucket has not yet been split (its marked invalid)
329 * or we have not yet seen the split.
330 */
331 if (grow_idx(old_idx) != new_idx) {
332 /*
333 * Search the moved bucket. It is guaranteed to contain
334 * items of the newly added bucket that were present
335 * before the moved bucket was split.
336 */
337 new_head = h->new_b->head[grow_idx(old_idx)];
338 }
339
340 /* new_head is now the moved bucket, either valid or invalid. */
341
342 /*
343 * The old bucket was definitely moved to new_head but the
344 * change of new_head had not yet propagated to this cpu.
345 */
346 if (N_INVALID == get_mark(new_head)) {
347 /*
348 * We could issue a read_barrier() and make the now valid
349 * moved bucket head new_head visible, but instead fall back
350 * on using the old bucket. Although the old bucket head is
351 * invalid, it points to a node that is allocated and in the
352 * right bucket. Before the node can be freed, it must be
353 * unlinked from the head (or another item after that item
354 * modified the new_head) and a grace period must elapse.
355 * As a result had the node been already freed the grace
356 * period preceeding the free() would make the unlink and
357 * any changes to new_head visible. Therefore, it is safe
358 * to use the node pointed to from the old bucket head.
359 */
360
361 search_head = old_head;
362 } else {
363 search_head = new_head;
364 }
365 }
366
367 return search_bucket(h, search_head, key, hash);
368 } else if (h->b->order > h->new_b->order) {
369 /* Shrinking. */
370
371 /* Index of the bucket in the old table that was moved. */
372 size_t move_src_idx = grow_idx(new_idx);
373 marked_ptr_t moved_old_head = h->b->head[move_src_idx];
374
375 /*
376 * h->b->head[move_src_idx] had already been moved to new_head
377 * but the change to new_head had not yet propagated to us.
378 */
379 if (N_INVALID == get_mark(new_head)) {
380 /*
381 * new_head is definitely valid and we could make it visible
382 * to this cpu with a read_barrier(). Instead, use the bucket
383 * in the old table that was moved even though it is now marked
384 * as invalid. The node it points to must be allocated because
385 * a grace period would have to elapse before it could be freed;
386 * and the grace period would make the now valid new_head
387 * visible to all cpus.
388 *
389 * Note that move_src_idx may not be the same as old_idx.
390 * If move_src_idx != old_idx then old_idx is the bucket
391 * in the old table that is not moved but instead it is
392 * appended to the moved bucket, ie it is added at the tail
393 * of new_head. In that case an invalid old_head notes that
394 * it had already been merged into (the moved) new_head.
395 * We will try to search that bucket first because it
396 * may contain some newly added nodes after the bucket
397 * join. Moreover, the bucket joining link may already be
398 * visible even if new_head is not. Therefore, if we're
399 * lucky we'll find the item via moved_old_head. In any
400 * case, we'll retry in proper old_head if not found.
401 */
402 search_head = moved_old_head;
403 }
404
405 cht_link_t *ret = search_bucket(h, search_head, key, hash);
406
407 if (ret)
408 return ret;
409 /*
410 * Bucket old_head was already joined with moved_old_head
411 * in the new table but we have not yet seen change of the
412 * joining link (or the item is not in the table).
413 */
414 if (move_src_idx != old_idx && get_next(old_head)) {
415 /*
416 * Note that old_head (the bucket to be merged into new_head)
417 * points to an allocated join node (if non-null) even if marked
418 * invalid. Before the resizer lets join nodes to be unlinked
419 * (and freed) it sets old_head to 0 and waits for a grace period.
420 * So either the invalid old_head points to join node; or old_head
421 * is null and we would have seen a completed bucket join while
422 * traversing search_head.
423 */
424 ASSERT(N_JOIN & get_mark(get_next(old_head)->link));
425 return search_bucket(h, old_head, key, hash);
426 }
427
428 return 0;
429 } else {
430 /*
431 * Resize is almost done. The resizer is waiting to make
432 * sure all cpus see that the new table replaced the old one.
433 */
434 ASSERT(h->b->order == h->new_b->order);
435 /*
436 * The resizer must ensure all new bucket heads are visible before
437 * replacing the old table.
438 */
439 ASSERT(N_NORMAL == get_mark(new_head));
440 return search_bucket(h, new_head, key, hash);
441 }
442}
443
444
445void cht_insert(cht_t *h, cht_link_t *item)
446{
447 insert_impl(h, item, false);
448}
449
450bool cht_insert_unique(cht_t *h, cht_link_t *item)
451{
452 return insert_impl(h, item, true);
453}
454
455static bool insert_impl(cht_t *h, cht_link_t *item, bool unique)
456{
457 rcu_read_lock();
458
459 cht_buckets_t *b = rcu_access(h->b);
460 size_t hash = node_hash(h, item);
461 size_t idx = calc_bucket_idx(hash, b->order);
462 marked_ptr_t *phead = &b->head[idx];
463
464 bool resizing = false;
465 bool inserted;
466
467 do {
468 walk_mode_t walk_mode = WM_NORMAL;
469 bool join_finishing;
470
471 resizing = resizing || (N_NORMAL != get_mark(*phead));
472
473 /* The table is resizing. Get the correct bucket head. */
474 if (resizing) {
475 upd_resizing_head(h, hash, &phead, &join_finishing, &walk_mode);
476 }
477
478 wnd_t wnd = {
479 .ppred = phead,
480 .cur = get_next(*phead),
481 .last = 0
482 };
483
484 if (!find_wnd_and_gc(h, hash, walk_mode, &wnd, &resizing)) {
485 /* Could not GC a node; or detected an unexpected resize. */
486 continue;
487 }
488
489 if (unique && has_duplicates(h, item, hash, &wnd)) {
490 rcu_read_unlock();
491 return false;
492 }
493
494 inserted = insert_at(item, &wnd, walk_mode, &resizing);
495 } while (!inserted);
496
497 rcu_read_unlock();
498
499 item_inserted(h);
500 return true;
501}
502
503static bool insert_at(cht_link_t *item, const wnd_t *wnd, walk_mode_t walk_mode,
504 bool *resizing)
505{
506 marked_ptr_t ret;
507
508 if (walk_mode == WM_NORMAL) {
509 item->link = make_link(wnd->cur, N_NORMAL);
510 /* Initialize the item before adding it to a bucket. */
511 memory_barrier();
512
513 /* Link a clean/normal predecessor to the item. */
514 ret = cas_link(wnd->ppred, wnd->cur, N_NORMAL, item, N_NORMAL);
515
516 if (ret == make_link(wnd->cur, N_NORMAL)) {
517 return true;
518 } else {
519 /* This includes an invalid head but not a const head. */
520 *resizing = ((N_JOIN_FOLLOWS | N_JOIN) & get_mark(ret));
521 return false;
522 }
523 } else if (walk_mode == WM_MOVE_JOIN_FOLLOWS) {
524 /* Move JOIN_FOLLOWS mark but filter out the DELETED mark. */
525 mark_t jf_mark = get_mark(*wnd->ppred) & N_JOIN_FOLLOWS;
526 item->link = make_link(wnd->cur, jf_mark);
527 /* Initialize the item before adding it to a bucket. */
528 memory_barrier();
529
530 /* Link the not-deleted predecessor to the item. Move its JF mark. */
531 ret = cas_link(wnd->ppred, wnd->cur, jf_mark, item, N_NORMAL);
532
533 return ret == make_link(wnd->cur, jf_mark);
534 } else {
535 ASSERT(walk_mode == WM_LEAVE_JOIN);
536
537 item->link = make_link(wnd->cur, N_NORMAL);
538 /* Initialize the item before adding it to a bucket. */
539 memory_barrier();
540
541 mark_t pred_mark = get_mark(*wnd->ppred);
542 /* If the predecessor is a join node it may be marked deleted.*/
543 mark_t exp_pred_mark = (N_JOIN & pred_mark) ? pred_mark : N_NORMAL;
544
545 ret = cas_link(wnd->ppred, wnd->cur, exp_pred_mark, item, exp_pred_mark);
546 return ret == make_link(wnd->cur, exp_pred_mark);
547 }
548}
549
550static bool has_duplicates(cht_t *h, const cht_link_t *item, size_t hash,
551 const wnd_t *wnd)
552{
553 ASSERT(0 == wnd->cur || hash <= node_hash(h, wnd->cur));
554
555 if (0 == wnd->cur || hash < node_hash(h, wnd->cur))
556 return false;
557
558 /*
559 * Load the most recent node marks. Otherwise we might pronounce a
560 * logically deleted node for a duplicate of the item just because
561 * the deleted node's DEL mark had not yet propagated to this cpu.
562 */
563 read_barrier();
564 return 0 != find_duplicate(h, item, hash, wnd->cur);
565}
566
567static cht_link_t *find_duplicate(cht_t *h, const cht_link_t *item, size_t hash,
568 cht_link_t *start)
569{
570 ASSERT(0 == start || hash <= node_hash(h, start));
571
572 cht_link_t *cur = start;
573
574 while (cur && node_hash(h, cur) == hash) {
575 bool deleted = (N_DELETED & get_mark(cur->link));
576
577 /* Skip logically deleted nodes. */
578 if (!deleted && h->op->equal(item, cur))
579 return cur;
580
581 cur = get_next(cur->link);
582 }
583
584 return 0;
585}
586
587size_t cht_remove_key(cht_t *h, void *key)
588{
589 ASSERT(h);
590
591 size_t hash = key_hash(h, key);
592 size_t removed = 0;
593
594 while (remove_pred(h, hash, h->op->key_equal, key))
595 ++removed;
596
597 return removed;
598}
599
600bool cht_remove_item(cht_t *h, cht_link_t *item)
601{
602 ASSERT(h);
603 ASSERT(item);
604
605 /*
606 * Even though we know the node we want to delete we must unlink it
607 * from the correct bucket and from a clean/normal predecessor. Therefore,
608 * we search for it again from the beginning of the correct bucket.
609 */
610 size_t hash = node_hash(h, item);
611 return remove_pred(h, hash, same_node_pred, item);
612}
613
614
615static bool remove_pred(cht_t *h, size_t hash, equal_pred_t pred, void *pred_arg)
616{
617 rcu_read_lock();
618
619 bool resizing = false;
620 bool deleted = false;
621 bool deleted_but_gc = false;
622
623 cht_buckets_t *b = rcu_access(h->b);
624 size_t idx = calc_bucket_idx(hash, b->order);
625 marked_ptr_t *phead = &b->head[idx];
626
627 do {
628 walk_mode_t walk_mode = WM_NORMAL;
629 bool join_finishing = false;
630
631 resizing = resizing || (N_NORMAL != get_mark(*phead));
632
633 /* The table is resizing. Get the correct bucket head. */
634 if (resizing) {
635 upd_resizing_head(h, hash, &phead, &join_finishing, &walk_mode);
636 }
637
638 wnd_t wnd = {
639 .ppred = phead,
640 .cur = get_next(*phead),
641 .last = 0
642 };
643
644 if (!find_wnd_and_gc_pred(
645 h, hash, walk_mode, pred, pred_arg, &wnd, &resizing)) {
646 /* Could not GC a node; or detected an unexpected resize. */
647 continue;
648 }
649
650 /*
651 * The item lookup is affected by a bucket join but effects of
652 * the bucket join have not been seen while searching for the item.
653 */
654 if (join_finishing && !join_completed(h, &wnd)) {
655 /*
656 * Bucket was appended at the end of another but the next
657 * ptr linking them together was not visible on this cpu.
658 * join_completed() makes this appended bucket visible.
659 */
660 continue;
661 }
662
663 /* Already deleted, but delete_at() requested one GC pass. */
664 if (deleted_but_gc)
665 break;
666
667 bool found = wnd.cur && pred(pred_arg, wnd.cur);
668
669 if (!found) {
670 rcu_read_unlock();
671 return false;
672 }
673
674 deleted = delete_at(h, &wnd, walk_mode, &deleted_but_gc, &resizing);
675 } while (!deleted || deleted_but_gc);
676
677 rcu_read_unlock();
678 return true;
679}
680
681
682static bool delete_at(cht_t *h, wnd_t *wnd, walk_mode_t walk_mode,
683 bool *deleted_but_gc, bool *resizing)
684{
685 ASSERT(wnd->cur);
686
687 *deleted_but_gc = false;
688
689 if (!mark_deleted(wnd->cur, walk_mode, resizing)) {
690 /* Already deleted, or unexpectedly marked as JOIN/JOIN_FOLLOWS. */
691 return false;
692 }
693
694 /* Marked deleted. Unlink from the bucket. */
695
696 /* Never unlink join nodes. */
697 if (walk_mode == WM_LEAVE_JOIN && (N_JOIN & get_mark(wnd->cur->link)))
698 return true;
699
700 cas_order_barrier();
701
702 if (unlink_from_pred(wnd, walk_mode, resizing)) {
703 free_later(h, wnd->cur);
704 } else {
705 *deleted_but_gc = true;
706 }
707
708 return true;
709}
710
711static bool mark_deleted(cht_link_t *cur, walk_mode_t walk_mode, bool *resizing)
712{
713 ASSERT(cur);
714
715 /*
716 * Btw, we could loop here if the cas fails but let's not complicate
717 * things and let's retry from the head of the bucket.
718 */
719
720 cht_link_t *next = get_next(cur->link);
721
722 if (walk_mode == WM_NORMAL) {
723 /* Only mark clean/normal nodes - JF/JN is used only during resize. */
724 marked_ptr_t ret = cas_link(&cur->link, next, N_NORMAL, next, N_DELETED);
725
726 if (ret != make_link(next, N_NORMAL)) {
727 *resizing = (N_JOIN | N_JOIN_FOLLOWS) & get_mark(ret);
728 return false;
729 }
730 } else {
731 ASSERT(N_JOIN == N_JOIN_FOLLOWS);
732
733 /* Keep the N_JOIN/N_JOIN_FOLLOWS mark but strip N_DELETED. */
734 mark_t cur_mark = get_mark(cur->link) & N_JOIN_FOLLOWS;
735
736 marked_ptr_t ret =
737 cas_link(&cur->link, next, cur_mark, next, cur_mark | N_DELETED);
738
739 if (ret != make_link(next, cur_mark))
740 return false;
741 }
742
743 return true;
744}
745
746static bool unlink_from_pred(wnd_t *wnd, walk_mode_t walk_mode, bool *resizing)
747{
748 ASSERT(wnd->cur && (N_DELETED & get_mark(wnd->cur->link)));
749
750 cht_link_t *next = get_next(wnd->cur->link);
751
752 if (walk_mode == WM_LEAVE_JOIN) {
753 /* Never try to unlink join nodes. */
754 ASSERT(!(N_JOIN & get_mark(wnd->cur->link)));
755
756 mark_t pred_mark = get_mark(*wnd->ppred);
757 /* Succeed only if the predecessor is clean/normal or a join node. */
758 mark_t exp_pred_mark = (N_JOIN & pred_mark) ? pred_mark : N_NORMAL;
759
760 marked_ptr_t pred_link = make_link(wnd->cur, exp_pred_mark);
761 marked_ptr_t next_link = make_link(next, exp_pred_mark);
762
763 if (pred_link != _cas_link(wnd->ppred, pred_link, next_link))
764 return false;
765 } else {
766 ASSERT(walk_mode == WM_MOVE_JOIN_FOLLOWS || walk_mode == WM_NORMAL);
767 /* Move the JF mark if set. Clear DEL mark. */
768 mark_t cur_mark = N_JOIN_FOLLOWS & get_mark(wnd->cur->link);
769
770 /* The predecessor must be clean/normal. */
771 marked_ptr_t pred_link = make_link(wnd->cur, N_NORMAL);
772 /* Link to cur's successor keeping/copying cur's JF mark. */
773 marked_ptr_t next_link = make_link(next, cur_mark);
774
775 marked_ptr_t ret = _cas_link(wnd->ppred, pred_link, next_link);
776
777 if (pred_link != ret) {
778 /* If we're not resizing the table there are no JF/JN nodes. */
779 *resizing = (walk_mode == WM_NORMAL)
780 && (N_JOIN_FOLLOWS & get_mark(ret));
781 return false;
782 }
783 }
784
785 return true;
786}
787
788
789static bool find_wnd_and_gc_pred(cht_t *h, size_t hash, walk_mode_t walk_mode,
790 equal_pred_t pred, void *pred_arg, wnd_t *wnd, bool *resizing)
791{
792 if (!wnd->cur)
793 return true;
794
795 /*
796 * A read barrier is not needed here to bring up the most recent
797 * node marks (esp the N_DELETED). At worst we'll try to delete
798 * an already deleted node; fail in delete_at(); and retry.
799 */
800
801 size_t cur_hash = node_hash(h, wnd->cur);
802
803 while (cur_hash <= hash) {
804 /* GC any deleted nodes on the way. */
805 if (N_DELETED & get_mark(wnd->cur->link)) {
806 if (!gc_deleted_node(h, walk_mode, wnd, resizing)) {
807 /* Retry from the head of a bucket. */
808 return false;
809 }
810 } else {
811 /* Is this the node we were looking for? */
812 if (cur_hash == hash && pred(pred_arg, wnd->cur))
813 return true;
814
815 next_wnd(wnd);
816 }
817
818 /* The searched for node is not in the current bucket. */
819 if (!wnd->cur)
820 return true;
821
822 cur_hash = node_hash(h, wnd->cur);
823 }
824
825 /* The searched for node is not in the current bucket. */
826 return true;
827}
828
829/* todo: comment different semantics (eg deleted JN first w/ specific hash) */
830static bool find_wnd_and_gc(cht_t *h, size_t hash, walk_mode_t walk_mode,
831 wnd_t *wnd, bool *resizing)
832{
833 while (wnd->cur && node_hash(h, wnd->cur) < hash) {
834 /* GC any deleted nodes along the way to our desired node. */
835 if (N_DELETED & get_mark(wnd->cur->link)) {
836 if (!gc_deleted_node(h, walk_mode, wnd, resizing)) {
837 /* Failed to remove the garbage node. Retry. */
838 return false;
839 }
840 } else {
841 next_wnd(wnd);
842 }
843 }
844
845 /* wnd->cur may be 0 or even marked N_DELETED. */
846 return true;
847}
848
849static bool gc_deleted_node(cht_t *h, walk_mode_t walk_mode, wnd_t *wnd,
850 bool *resizing)
851{
852 ASSERT(N_DELETED & get_mark(wnd->cur->link));
853
854 /* Skip deleted JOIN nodes. */
855 if (walk_mode == WM_LEAVE_JOIN && (N_JOIN & get_mark(wnd->cur->link))) {
856 next_wnd(wnd);
857 } else {
858 /* Ordinary deleted node or a deleted JOIN_FOLLOWS. */
859 ASSERT(walk_mode != WM_LEAVE_JOIN
860 || !((N_JOIN | N_JOIN_FOLLOWS) & get_mark(wnd->cur->link)));
861
862 /* Unlink an ordinary deleted node, move JOIN_FOLLOWS mark. */
863 if (!unlink_from_pred(wnd, walk_mode, resizing)) {
864 /* Retry. The predecessor was deleted, invalid, const, join_follows. */
865 return false;
866 }
867
868 free_later(h, wnd->cur);
869
870 /* Leave ppred as is. */
871 wnd->last = wnd->cur;
872 wnd->cur = get_next(wnd->cur->link);
873 }
874
875 return true;
876}
877
878static bool join_completed(cht_t *h, const wnd_t *wnd)
879{
880 /*
881 * The table is shrinking and the searched for item is in a bucket
882 * appended to another. Check that the link joining these two buckets
883 * is visible and if not, make it visible to this cpu.
884 */
885
886 /*
887 * Resizer ensures h->b->order stays the same for the duration of this
888 * func. We got here because there was an alternative head to search.
889 * The resizer waits for all preexisting readers to finish after
890 * it
891 */
892 ASSERT(h->b->order > h->new_b->order);
893
894 /* Either we did not need the joining link or we have already followed it.*/
895 if (wnd->cur)
896 return true;
897
898 /* We have reached the end of a bucket. */
899
900 if (wnd->last) {
901 size_t last_seen_hash = node_hash(h, wnd->last);
902 size_t last_old_idx = calc_bucket_idx(last_seen_hash, h->b->order);
903 size_t move_src_idx = grow_idx(shrink_idx(last_old_idx));
904
905 /*
906 * Last was in the joining bucket - if the searched for node is there
907 * we will find it.
908 */
909 if (move_src_idx != last_old_idx)
910 return true;
911 }
912
913 /*
914 * Reached the end of the bucket but no nodes from the joining bucket
915 * were seen. There should have at least been a JOIN node so we have
916 * definitely not seen (and followed) the joining link. Make the link
917 * visible and retry.
918 */
919 read_barrier();
920 return false;
921}
922
923static void upd_resizing_head(cht_t *h, size_t hash, marked_ptr_t **phead,
924 bool *join_finishing, walk_mode_t *walk_mode)
925{
926 cht_buckets_t *b = rcu_access(h->b);
927 size_t old_idx = calc_bucket_idx(hash, b->order);
928 size_t new_idx = calc_bucket_idx(hash, h->new_b->order);
929
930 marked_ptr_t *pold_head = &b->head[old_idx];
931 marked_ptr_t *pnew_head = &h->new_b->head[new_idx];
932
933 /* In any case, use the bucket in the new table. */
934 *phead = pnew_head;
935
936 /* Growing the table. */
937 if (b->order < h->new_b->order) {
938 size_t move_dest_idx = grow_idx(old_idx);
939 marked_ptr_t *pmoved_head = &h->new_b->head[move_dest_idx];
940
941 /* Complete moving the bucket from the old to the new table. */
942 help_head_move(pold_head, pmoved_head);
943
944 /* The hash belongs to the moved bucket. */
945 if (move_dest_idx == new_idx) {
946 ASSERT(pmoved_head == pnew_head);
947 /*
948 * move_head() makes the new head of the moved bucket visible.
949 * The new head may be marked with a JOIN_FOLLOWS
950 */
951 ASSERT(!(N_CONST & get_mark(*pmoved_head)));
952 *walk_mode = WM_MOVE_JOIN_FOLLOWS;
953 } else {
954 ASSERT(pmoved_head != pnew_head);
955 /*
956 * The hash belongs to the bucket that is the result of splitting
957 * the old/moved bucket, ie the bucket that contains the second
958 * half of the split/old/moved bucket.
959 */
960
961 /* The moved bucket has not yet been split. */
962 if (N_NORMAL != get_mark(*pnew_head)) {
963 size_t split_hash = calc_split_hash(new_idx, h->new_b->order);
964 split_bucket(h, pmoved_head, pnew_head, split_hash);
965 /*
966 * split_bucket() makes the new head visible. No
967 * JOIN_FOLLOWS in this part of split bucket.
968 */
969 ASSERT(N_NORMAL == get_mark(*pnew_head));
970 }
971
972 *walk_mode = WM_LEAVE_JOIN;
973 }
974 } else if (h->new_b->order < b->order ) {
975 /* Shrinking the table. */
976
977 size_t move_src_idx = grow_idx(new_idx);
978
979 /*
980 * Complete moving the bucket from the old to the new table.
981 * Makes a valid pnew_head visible if already moved.
982 */
983 help_head_move(&b->head[move_src_idx], pnew_head);
984
985 /* Hash belongs to the bucket to be joined with the moved bucket. */
986 if (move_src_idx != old_idx) {
987 /* Bucket join not yet completed. */
988 if (N_INVALID != get_mark(*pold_head)) {
989 size_t split_hash = calc_split_hash(old_idx, b->order);
990 join_buckets(h, pold_head, pnew_head, split_hash);
991 }
992
993 /* The resizer sets pold_head to 0 when all cpus see the bucket join.*/
994 *join_finishing = (0 != get_next(*pold_head));
995 }
996
997 /* move_head() or join_buckets() makes it so or makes the mark visible.*/
998 ASSERT(N_INVALID == get_mark(*pold_head));
999 /* move_head() makes it visible. No JOIN_FOLLOWS used when shrinking. */
1000 ASSERT(N_NORMAL == get_mark(*pnew_head));
1001
1002 *walk_mode = WM_LEAVE_JOIN;
1003 } else {
1004 /*
1005 * Final stage of resize. The resizer is waiting for all
1006 * readers to notice that the old table had been replaced.
1007 */
1008 ASSERT(b == h->new_b);
1009 *walk_mode = WM_NORMAL;
1010 }
1011}
1012
1013
1014#if 0
1015static void move_head(marked_ptr_t *psrc_head, marked_ptr_t *pdest_head)
1016{
1017 start_head_move(psrc_head);
1018 cas_order_barrier();
1019 complete_head_move(psrc_head, pdest_head);
1020}
1021#endif
1022
1023static void help_head_move(marked_ptr_t *psrc_head, marked_ptr_t *pdest_head)
1024{
1025 /* Head move has to in progress already when calling this func. */
1026 ASSERT(N_CONST & get_mark(*psrc_head));
1027
1028 /* Head already moved. */
1029 if (N_INVALID == get_mark(*psrc_head)) {
1030 /* Effects of the head move have not yet propagated to this cpu. */
1031 if (N_INVALID == get_mark(*pdest_head)) {
1032 /* Make the move visible on this cpu. */
1033 read_barrier();
1034 }
1035 } else {
1036 complete_head_move(psrc_head, pdest_head);
1037 }
1038
1039 ASSERT(!(N_CONST & get_mark(*pdest_head)));
1040}
1041
1042static void start_head_move(marked_ptr_t *psrc_head)
1043{
1044 /* Mark src head immutable. */
1045 mark_const(psrc_head);
1046}
1047
1048static void mark_const(marked_ptr_t *psrc_head)
1049{
1050 marked_ptr_t ret, src_link;
1051
1052 /* Mark src head immutable. */
1053 do {
1054 cht_link_t *next = get_next(*psrc_head);
1055 src_link = make_link(next, N_NORMAL);
1056
1057 /* Mark the normal/clean src link immutable/const. */
1058 ret = cas_link(psrc_head, next, N_NORMAL, next, N_CONST);
1059 } while(ret != src_link && !(N_CONST & get_mark(ret)));
1060}
1061
1062static void complete_head_move(marked_ptr_t *psrc_head, marked_ptr_t *pdest_head)
1063{
1064 ASSERT(N_JOIN_FOLLOWS != get_mark(*psrc_head));
1065 ASSERT(N_CONST & get_mark(*psrc_head));
1066
1067 cht_link_t *next = get_next(*psrc_head);
1068 marked_ptr_t ret;
1069
1070 ret = cas_link(pdest_head, 0, N_INVALID, next, N_NORMAL);
1071 ASSERT(ret == make_link(0, N_INVALID) || (N_NORMAL == get_mark(ret)));
1072 cas_order_barrier();
1073
1074 ret = cas_link(psrc_head, next, N_CONST, next, N_INVALID);
1075 ASSERT(ret == make_link(next, N_CONST) || (N_INVALID == get_mark(ret)));
1076 cas_order_barrier();
1077}
1078
1079static void split_bucket(cht_t *h, marked_ptr_t *psrc_head,
1080 marked_ptr_t *pdest_head, size_t split_hash)
1081{
1082 /* Already split. */
1083 if (N_NORMAL == get_mark(*pdest_head))
1084 return;
1085
1086 /*
1087 * L == Last node of the first part of the split bucket. That part
1088 * remains in the original/src bucket.
1089 * F == First node of the second part of the split bucket. That part
1090 * will be referenced from the dest bucket head.
1091 *
1092 * We want to first mark a clean L as JF so that updaters unaware of
1093 * the split (or table resize):
1094 * - do not insert a new node between L and F
1095 * - do not unlink L (that is why it has to be clean/normal)
1096 * - do not unlink F
1097 *
1098 * Then we can safely mark F as JN even if it has been marked deleted.
1099 * Once F is marked as JN updaters aware of table resize will not
1100 * attempt to unlink it (JN will have two predecessors - we cannot
1101 * safely unlink from both at the same time). Updaters unaware of
1102 * ongoing resize can reach F only via L and that node is already
1103 * marked JF, so they won't unlink F.
1104 *
1105 * Last, link the new/dest head to F.
1106 *
1107 *
1108 * 0) ,-- split_hash, first hash of the dest bucket
1109 * v
1110 * [src_head | N] -> .. -> [L] -> [F]
1111 * [dest_head | Inv]
1112 *
1113 * 1) ,-- split_hash
1114 * v
1115 * [src_head | N] -> .. -> [JF] -> [F]
1116 * [dest_head | Inv]
1117 *
1118 * 2) ,-- split_hash
1119 * v
1120 * [src_head | N] -> .. -> [JF] -> [JN]
1121 * [dest_head | Inv]
1122 *
1123 * 2) ,-- split_hash
1124 * v
1125 * [src_head | N] -> .. -> [JF] -> [JN]
1126 * ^
1127 * [dest_head | N] -----------------'
1128 */
1129 wnd_t wnd;
1130
1131 rcu_read_lock();
1132
1133 /* Mark the last node of the first part of the split bucket as JF. */
1134 mark_join_follows(h, psrc_head, split_hash, &wnd);
1135 cas_order_barrier();
1136
1137 /* There are nodes in the dest bucket, ie the second part of the split. */
1138 if (wnd.cur) {
1139 /*
1140 * Mark the first node of the dest bucket as a join node so
1141 * updaters do not attempt to unlink it if it is deleted.
1142 */
1143 mark_join_node(wnd.cur);
1144 cas_order_barrier();
1145 } else {
1146 /*
1147 * Second part of the split bucket is empty. There are no nodes
1148 * to mark as JOIN nodes and there never will be.
1149 */
1150 }
1151
1152 /* Link the dest head to the second part of the split. */
1153 marked_ptr_t ret = cas_link(pdest_head, 0, N_INVALID, wnd.cur, N_NORMAL);
1154 ASSERT(ret == make_link(0, N_INVALID) || (N_NORMAL == get_mark(ret)));
1155 cas_order_barrier();
1156
1157 rcu_read_unlock();
1158}
1159
1160static void mark_join_follows(cht_t *h, marked_ptr_t *psrc_head,
1161 size_t split_hash, wnd_t *wnd)
1162{
1163 /* See comment in split_bucket(). */
1164
1165 bool done;
1166 do {
1167 bool resizing = false;
1168 wnd->ppred = psrc_head;
1169 wnd->cur = get_next(*psrc_head);
1170
1171 /*
1172 * Find the split window, ie the last node of the first part of
1173 * the split bucket and the its successor - the first node of
1174 * the second part of the split bucket. Retry if GC failed.
1175 */
1176 if (!find_wnd_and_gc(h, split_hash, WM_MOVE_JOIN_FOLLOWS, wnd, &resizing))
1177 continue;
1178
1179 /* Must not report that the table is resizing if WM_MOVE_JOIN_FOLLOWS.*/
1180 ASSERT(!resizing);
1181 /*
1182 * Mark the last node of the first half of the split bucket
1183 * that a join node follows. It must be clean/normal.
1184 */
1185 marked_ptr_t ret
1186 = cas_link(wnd->ppred, wnd->cur, N_NORMAL, wnd->cur, N_JOIN_FOLLOWS);
1187
1188 /*
1189 * Successfully marked as a JF node or already marked that way (even
1190 * if also marked deleted - unlinking the node will move the JF mark).
1191 */
1192 done = (ret == make_link(wnd->cur, N_NORMAL))
1193 || (N_JOIN_FOLLOWS & get_mark(ret));
1194 } while (!done);
1195}
1196
1197static void mark_join_node(cht_link_t *join_node)
1198{
1199 /* See comment in split_bucket(). */
1200
1201 bool done;
1202 do {
1203 cht_link_t *next = get_next(join_node->link);
1204 mark_t mark = get_mark(join_node->link);
1205
1206 /*
1207 * May already be marked as deleted, but it won't be unlinked
1208 * because its predecessor is marked with JOIN_FOLLOWS or CONST.
1209 */
1210 marked_ptr_t ret
1211 = cas_link(&join_node->link, next, mark, next, mark | N_JOIN);
1212
1213 /* Successfully marked or already marked as a join node. */
1214 done = (ret == make_link(next, mark))
1215 || (N_JOIN & get_mark(ret));
1216 } while(!done);
1217}
1218
1219
1220static void join_buckets(cht_t *h, marked_ptr_t *psrc_head,
1221 marked_ptr_t *pdest_head, size_t split_hash)
1222{
1223 /* Buckets already joined. */
1224 if (N_INVALID == get_mark(*psrc_head))
1225 return;
1226 /*
1227 * F == First node of psrc_head, ie the bucket we want to append
1228 * to (ie join with) the bucket starting at pdest_head.
1229 * L == Last node of pdest_head, ie the bucket that psrc_head will
1230 * be appended to.
1231 *
1232 * (1) We first mark psrc_head immutable to signal that a join is
1233 * in progress and so that updaters unaware of the join (or table
1234 * resize):
1235 * - do not insert new nodes between the head psrc_head and F
1236 * - do not unlink F (it may already be marked deleted)
1237 *
1238 * (2) Next, F is marked as a join node. Updaters aware of table resize
1239 * will not attempt to unlink it. We cannot safely/atomically unlink
1240 * the join node because it will be pointed to from two different
1241 * buckets. Updaters unaware of resize will fail to unlink the join
1242 * node due to the head being marked immutable.
1243 *
1244 * (3) Then the tail of the bucket at pdest_head is linked to the join
1245 * node. From now on, nodes in both buckets can be found via pdest_head.
1246 *
1247 * (4) Last, mark immutable psrc_head as invalid. It signals updaters
1248 * that the join is complete and they can insert new nodes (originally
1249 * destined for psrc_head) into pdest_head.
1250 *
1251 * Note that pdest_head keeps pointing at the join node. This allows
1252 * lookups and updaters to determine if they should see a link between
1253 * the tail L and F when searching for nodes originally in psrc_head
1254 * via pdest_head. If they reach the tail of pdest_head without
1255 * encountering any nodes of psrc_head, either there were no nodes
1256 * in psrc_head to begin with or the link between L and F did not
1257 * yet propagate to their cpus. If psrc_head was empty, it remains
1258 * NULL. Otherwise psrc_head points to a join node (it will not be
1259 * unlinked until table resize completes) and updaters/lookups
1260 * should issue a read_barrier() to make the link [L]->[JN] visible.
1261 *
1262 * 0) ,-- split_hash, first hash of the src bucket
1263 * v
1264 * [dest_head | N]-> .. -> [L]
1265 * [src_head | N]--> [F] -> ..
1266 * ^
1267 * ` split_hash, first hash of the src bucket
1268 *
1269 * 1) ,-- split_hash
1270 * v
1271 * [dest_head | N]-> .. -> [L]
1272 * [src_head | C]--> [F] -> ..
1273 *
1274 * 2) ,-- split_hash
1275 * v
1276 * [dest_head | N]-> .. -> [L]
1277 * [src_head | C]--> [JN] -> ..
1278 *
1279 * 3) ,-- split_hash
1280 * v
1281 * [dest_head | N]-> .. -> [L] --+
1282 * v
1283 * [src_head | C]-------------> [JN] -> ..
1284 *
1285 * 4) ,-- split_hash
1286 * v
1287 * [dest_head | N]-> .. -> [L] --+
1288 * v
1289 * [src_head | Inv]-----------> [JN] -> ..
1290 */
1291
1292 rcu_read_lock();
1293
1294 /* Mark src_head immutable - signals updaters that bucket join started. */
1295 mark_const(psrc_head);
1296 cas_order_barrier();
1297
1298 cht_link_t *join_node = get_next(*psrc_head);
1299
1300 if (join_node) {
1301 mark_join_node(join_node);
1302 cas_order_barrier();
1303
1304 link_to_join_node(h, pdest_head, join_node, split_hash);
1305 cas_order_barrier();
1306 }
1307
1308 marked_ptr_t ret =
1309 cas_link(psrc_head, join_node, N_CONST, join_node, N_INVALID);
1310 ASSERT(ret == make_link(join_node, N_CONST) || (N_INVALID == get_mark(ret)));
1311 cas_order_barrier();
1312
1313 rcu_read_unlock();
1314}
1315
1316static void link_to_join_node(cht_t *h, marked_ptr_t *pdest_head,
1317 cht_link_t *join_node, size_t split_hash)
1318{
1319 bool done;
1320 do {
1321 wnd_t wnd = {
1322 .ppred = pdest_head,
1323 .cur = get_next(*pdest_head)
1324 };
1325
1326 bool resizing = false;
1327
1328 if (!find_wnd_and_gc(h, split_hash, WM_LEAVE_JOIN, &wnd, &resizing))
1329 continue;
1330
1331 ASSERT(!resizing);
1332
1333 if (wnd.cur) {
1334 /* Must be from the new appended bucket. */
1335 ASSERT(split_hash <= node_hash(h, wnd.cur));
1336 return;
1337 }
1338
1339 /* Reached the tail of pdest_head - link it to the join node. */
1340 marked_ptr_t ret = cas_link(wnd.ppred, 0, N_NORMAL, join_node, N_NORMAL);
1341
1342 done = (ret == make_link(0, N_NORMAL));
1343 } while (!done);
1344}
1345
1346static void free_later(cht_t *h, cht_link_t *item)
1347{
1348 /*
1349 * remove_callback only works as rcu_func_t because rcu_link is the first
1350 * field in cht_link_t.
1351 */
1352 rcu_call(&item->rcu_link, (rcu_func_t)h->op->remove_callback);
1353
1354 item_removed(h);
1355}
1356
1357static void item_removed(cht_t *h)
1358{
1359 size_t items = (size_t) atomic_predec(&h->item_cnt);
1360 size_t bucket_cnt = (1 << h->b->order);
1361
1362 bool need_shrink = (items == CHT_MAX_LOAD * bucket_cnt / 4);
1363 bool missed_shrink = (items == CHT_MAX_LOAD * bucket_cnt / 8);
1364
1365 if ((need_shrink || missed_shrink) && h->b->order > h->min_order) {
1366 atomic_count_t resize_reqs = atomic_preinc(&h->resize_reqs);
1367 /* The first resize request. Start the resizer. */
1368 if (1 == resize_reqs) {
1369 workq_global_enqueue_noblock(&h->resize_work, resize_table);
1370 }
1371 }
1372}
1373
1374static void item_inserted(cht_t *h)
1375{
1376 size_t items = (size_t) atomic_preinc(&h->item_cnt);
1377 size_t bucket_cnt = (1 << h->b->order);
1378
1379 bool need_grow = (items == CHT_MAX_LOAD * bucket_cnt);
1380 bool missed_grow = (items == 2 * CHT_MAX_LOAD * bucket_cnt);
1381
1382 if ((need_grow || missed_grow) && h->b->order < CHT_MAX_ORDER) {
1383 atomic_count_t resize_reqs = atomic_preinc(&h->resize_reqs);
1384 /* The first resize request. Start the resizer. */
1385 if (1 == resize_reqs) {
1386 workq_global_enqueue_noblock(&h->resize_work, resize_table);
1387 }
1388 }
1389}
1390
1391static void resize_table(work_t *arg)
1392{
1393 cht_t *h = member_to_inst(arg, cht_t, resize_work);
1394
1395#ifdef CONFIG_DEBUG
1396 ASSERT(h->b);
1397 /* Make resize_reqs visible. */
1398 read_barrier();
1399 ASSERT(0 < atomic_get(&h->resize_reqs));
1400#endif
1401
1402 bool done;
1403 do {
1404 /* Load the most recent h->item_cnt. */
1405 read_barrier();
1406 size_t cur_items = (size_t) atomic_get(&h->item_cnt);
1407 size_t bucket_cnt = (1 << h->b->order);
1408 size_t max_items = CHT_MAX_LOAD * bucket_cnt;
1409
1410 if (cur_items >= max_items && h->b->order < CHT_MAX_ORDER) {
1411 grow_table(h);
1412 } else if (cur_items <= max_items / 4 && h->b->order > h->min_order) {
1413 shrink_table(h);
1414 } else {
1415 /* Table is just the right size. */
1416 atomic_count_t reqs = atomic_predec(&h->resize_reqs);
1417 done = (reqs == 0);
1418 }
1419 } while (!done);
1420}
1421
1422static void grow_table(cht_t *h)
1423{
1424 if (h->b->order >= CHT_MAX_ORDER)
1425 return;
1426
1427 h->new_b = alloc_buckets(h->b->order + 1, true);
1428
1429 /* Failed to alloc a new table - try next time the resizer is run. */
1430 if (!h->new_b)
1431 return;
1432
1433 /* Wait for all readers and updaters to see the initialized new table. */
1434 rcu_synchronize();
1435 size_t old_bucket_cnt = (1 << h->b->order);
1436
1437 /*
1438 * Give updaters a chance to help out with the resize. Do the minimum
1439 * work needed to announce a resize is in progress, ie start moving heads.
1440 */
1441 for (size_t idx = 0; idx < old_bucket_cnt; ++idx) {
1442 start_head_move(&h->b->head[idx]);
1443 }
1444
1445 /* Order start_head_move() wrt complete_head_move(). */
1446 cas_order_barrier();
1447
1448 /* Complete moving heads and split any buckets not yet split by updaters. */
1449 for (size_t old_idx = 0; old_idx < old_bucket_cnt; ++old_idx) {
1450 marked_ptr_t *move_dest_head = &h->new_b->head[grow_idx(old_idx)];
1451 marked_ptr_t *move_src_head = &h->b->head[old_idx];
1452
1453 /* Head move not yet completed. */
1454 if (N_INVALID != get_mark(*move_src_head)) {
1455 complete_head_move(move_src_head, move_dest_head);
1456 }
1457
1458 size_t split_idx = grow_to_split_idx(old_idx);
1459 size_t split_hash = calc_split_hash(split_idx, h->new_b->order);
1460 marked_ptr_t *split_dest_head = &h->new_b->head[split_idx];
1461
1462 split_bucket(h, move_dest_head, split_dest_head, split_hash);
1463 }
1464
1465 /*
1466 * Wait for all updaters to notice the new heads. Once everyone sees
1467 * the invalid old bucket heads they will know a resize is in progress
1468 * and updaters will modify the correct new buckets.
1469 */
1470 rcu_synchronize();
1471
1472 /* Clear the JOIN_FOLLOWS mark and remove the link between the split buckets.*/
1473 for (size_t old_idx = 0; old_idx < old_bucket_cnt; ++old_idx) {
1474 size_t new_idx = grow_idx(old_idx);
1475
1476 cleanup_join_follows(h, &h->new_b->head[new_idx]);
1477 }
1478
1479 /*
1480 * Wait for everyone to notice that buckets were split, ie link connecting
1481 * the join follows and join node has been cut.
1482 */
1483 rcu_synchronize();
1484
1485 /* Clear the JOIN mark and GC any deleted join nodes. */
1486 for (size_t old_idx = 0; old_idx < old_bucket_cnt; ++old_idx) {
1487 size_t new_idx = grow_to_split_idx(old_idx);
1488
1489 cleanup_join_node(h, &h->new_b->head[new_idx]);
1490 }
1491
1492 /* Wait for everyone to see that the table is clear of any resize marks. */
1493 rcu_synchronize();
1494
1495 cht_buckets_t *old_b = h->b;
1496 rcu_assign(h->b, h->new_b);
1497
1498 /* Wait for everyone to start using the new table. */
1499 rcu_synchronize();
1500
1501 free(old_b);
1502
1503 /* Not needed; just for increased readability. */
1504 h->new_b = 0;
1505}
1506
1507static void shrink_table(cht_t *h)
1508{
1509 if (h->b->order <= h->min_order)
1510 return;
1511
1512 h->new_b = alloc_buckets(h->b->order - 1, true);
1513
1514 /* Failed to alloc a new table - try next time the resizer is run. */
1515 if (!h->new_b)
1516 return;
1517
1518 /* Wait for all readers and updaters to see the initialized new table. */
1519 rcu_synchronize();
1520
1521 size_t old_bucket_cnt = (1 << h->b->order);
1522
1523 /*
1524 * Give updaters a chance to help out with the resize. Do the minimum
1525 * work needed to announce a resize is in progress, ie start moving heads.
1526 */
1527 for (size_t old_idx = 0; old_idx < old_bucket_cnt; ++old_idx) {
1528 size_t new_idx = shrink_idx(old_idx);
1529
1530 /* This bucket should be moved. */
1531 if (grow_idx(new_idx) == old_idx) {
1532 start_head_move(&h->b->head[old_idx]);
1533 } else {
1534 /* This bucket should join the moved bucket once the move is done.*/
1535 }
1536 }
1537
1538 /* Order start_head_move() wrt to complete_head_move(). */
1539 cas_order_barrier();
1540
1541 /* Complete moving heads and join buckets with the moved buckets. */
1542 for (size_t old_idx = 0; old_idx < old_bucket_cnt; ++old_idx) {
1543 size_t new_idx = shrink_idx(old_idx);
1544 size_t move_src_idx = grow_idx(new_idx);
1545
1546 /* This bucket should be moved. */
1547 if (move_src_idx == old_idx) {
1548 /* Head move not yet completed. */
1549 if (N_INVALID != get_mark(h->b->head[old_idx])) {
1550 complete_head_move(&h->b->head[old_idx], &h->new_b->head[new_idx]);
1551 }
1552 } else {
1553 /* This bucket should join the moved bucket. */
1554 size_t split_hash = calc_split_hash(old_idx, h->b->order);
1555 join_buckets(h, &h->b->head[old_idx], &h->new_b->head[new_idx],
1556 split_hash);
1557 }
1558 }
1559
1560 /*
1561 * Wait for all updaters to notice the new heads. Once everyone sees
1562 * the invalid old bucket heads they will know a resize is in progress
1563 * and updaters will modify the correct new buckets.
1564 */
1565 rcu_synchronize();
1566
1567 /* Let everyone know joins are complete and fully visible. */
1568 for (size_t old_idx = 0; old_idx < old_bucket_cnt; ++old_idx) {
1569 size_t move_src_idx = grow_idx(shrink_idx(old_idx));
1570
1571 /* Set the invalid joinee head to NULL. */
1572 if (old_idx != move_src_idx) {
1573 ASSERT(N_INVALID == get_mark(h->b->head[old_idx]));
1574
1575 if (0 != get_next(h->b->head[old_idx]))
1576 h->b->head[old_idx] = make_link(0, N_INVALID);
1577 }
1578 }
1579
1580 /* todo comment join node vs reset joinee head*/
1581 rcu_synchronize();
1582
1583 size_t new_bucket_cnt = (1 << h->new_b->order);
1584
1585 /* Clear the JOIN mark and GC any deleted join nodes. */
1586 for (size_t new_idx = 0; new_idx < new_bucket_cnt; ++new_idx) {
1587 cleanup_join_node(h, &h->new_b->head[new_idx]);
1588 }
1589
1590 /* Wait for everyone to see that the table is clear of any resize marks. */
1591 rcu_synchronize();
1592
1593 cht_buckets_t *old_b = h->b;
1594 rcu_assign(h->b, h->new_b);
1595
1596 /* Wait for everyone to start using the new table. */
1597 rcu_synchronize();
1598
1599 free(old_b);
1600
1601 /* Not needed; just for increased readability. */
1602 h->new_b = 0;
1603}
1604
1605static void cleanup_join_node(cht_t *h, marked_ptr_t *new_head)
1606{
1607 rcu_read_lock();
1608
1609 cht_link_t *cur = get_next(*new_head);
1610
1611 while (cur) {
1612 /* Clear the join node's JN mark - even if it is marked as deleted. */
1613 if (N_JOIN & get_mark(cur->link)) {
1614 clear_join_and_gc(h, cur, new_head);
1615 break;
1616 }
1617
1618 cur = get_next(cur->link);
1619 }
1620
1621 rcu_read_unlock();
1622}
1623
1624static void clear_join_and_gc(cht_t *h, cht_link_t *join_node,
1625 marked_ptr_t *new_head)
1626{
1627 ASSERT(join_node && (N_JOIN & get_mark(join_node->link)));
1628
1629 bool done;
1630
1631 /* Clear the JN mark. */
1632 do {
1633 marked_ptr_t jn_link = join_node->link;
1634 cht_link_t *next = get_next(jn_link);
1635 /* Clear the JOIN mark but keep the DEL mark if present. */
1636 mark_t cleared_mark = get_mark(jn_link) & N_DELETED;
1637
1638 marked_ptr_t ret =
1639 _cas_link(&join_node->link, jn_link, make_link(next, cleared_mark));
1640
1641 /* Done if the mark was cleared. Retry if a new node was inserted. */
1642 done = (ret == jn_link);
1643 ASSERT(ret == jn_link || (get_mark(ret) & N_JOIN));
1644 } while (!done);
1645
1646 if (!(N_DELETED & get_mark(join_node->link)))
1647 return;
1648
1649 /* The join node had been marked as deleted - GC it. */
1650
1651 /* Clear the JOIN mark before trying to unlink the deleted join node.*/
1652 cas_order_barrier();
1653
1654 size_t jn_hash = node_hash(h, join_node);
1655 do {
1656 bool resizing = false;
1657
1658 wnd_t wnd = {
1659 .ppred = new_head,
1660 .cur = get_next(*new_head)
1661 };
1662
1663 done = find_wnd_and_gc_pred(h, jn_hash, WM_NORMAL, same_node_pred,
1664 join_node, &wnd, &resizing);
1665
1666 ASSERT(!resizing);
1667 } while (!done);
1668}
1669
1670static void cleanup_join_follows(cht_t *h, marked_ptr_t *new_head)
1671{
1672 ASSERT(new_head);
1673
1674 rcu_read_lock();
1675
1676 wnd_t wnd = {
1677 .ppred = 0,
1678 .cur = 0
1679 };
1680 marked_ptr_t *cur_link = new_head;
1681
1682 /*
1683 * Find the non-deleted node with a JF mark and clear the JF mark.
1684 * The JF node may be deleted and/or the mark moved to its neighbors
1685 * at any time. Therefore, we GC deleted nodes until we find the JF
1686 * node in order to remove stale/deleted JF nodes left behind eg by
1687 * delayed threads that did not yet get a chance to unlink the deleted
1688 * JF node and move its mark.
1689 *
1690 * Note that the head may be marked JF (but never DELETED).
1691 */
1692 while (true) {
1693 bool is_jf_node = N_JOIN_FOLLOWS & get_mark(*cur_link);
1694
1695 /* GC any deleted nodes on the way - even deleted JOIN_FOLLOWS. */
1696 if (N_DELETED & get_mark(*cur_link)) {
1697 ASSERT(cur_link != new_head);
1698 ASSERT(wnd.ppred && wnd.cur);
1699 ASSERT(cur_link == &wnd.cur->link);
1700
1701 bool dummy;
1702 bool deleted = gc_deleted_node(h, WM_MOVE_JOIN_FOLLOWS, &wnd, &dummy);
1703
1704 /* Failed to GC or collected a deleted JOIN_FOLLOWS. */
1705 if (!deleted || is_jf_node) {
1706 /* Retry from the head of the bucket. */
1707 cur_link = new_head;
1708 continue;
1709 }
1710 } else {
1711 /* Found a non-deleted JF. Clear its JF mark. */
1712 if (is_jf_node) {
1713 cht_link_t *next = get_next(*cur_link);
1714 marked_ptr_t ret
1715 = cas_link(cur_link, next, N_JOIN_FOLLOWS, 0, N_NORMAL);
1716
1717 ASSERT(!next || ((N_JOIN | N_JOIN_FOLLOWS) & get_mark(ret)));
1718
1719 /* Successfully cleared the JF mark of a non-deleted node. */
1720 if (ret == make_link(next, N_JOIN_FOLLOWS)) {
1721 break;
1722 } else {
1723 /*
1724 * The JF node had been deleted or a new node inserted
1725 * right after it. Retry from the head.
1726 */
1727 cur_link = new_head;
1728 continue;
1729 }
1730 } else {
1731 wnd.ppred = cur_link;
1732 wnd.cur = get_next(*cur_link);
1733 }
1734 }
1735
1736 /* We must encounter a JF node before we reach the end of the bucket. */
1737 ASSERT(wnd.cur);
1738 cur_link = &wnd.cur->link;
1739 }
1740
1741 rcu_read_unlock();
1742}
1743
1744
1745static size_t calc_split_hash(size_t split_idx, size_t order)
1746{
1747 ASSERT(1 <= order && order <= 8 * sizeof(size_t));
1748 return split_idx << (8 * sizeof(size_t) - order);
1749}
1750
1751static size_t calc_bucket_idx(size_t hash, size_t order)
1752{
1753 ASSERT(1 <= order && order <= 8 * sizeof(size_t));
1754 return hash >> (8 * sizeof(size_t) - order);
1755}
1756
1757static size_t grow_to_split_idx(size_t old_idx)
1758{
1759 return grow_idx(old_idx) | 1;
1760}
1761
1762static size_t grow_idx(size_t idx)
1763{
1764 return idx << 1;
1765}
1766
1767static size_t shrink_idx(size_t idx)
1768{
1769 return idx >> 1;
1770}
1771
1772
1773static size_t key_hash(cht_t *h, void *key)
1774{
1775 return hash_mix(h->op->key_hash(key));
1776}
1777
1778static size_t node_hash(cht_t *h, const cht_link_t *item)
1779{
1780 return hash_mix(h->op->hash(item));
1781}
1782
1783
1784static marked_ptr_t make_link(const cht_link_t *next, mark_t mark)
1785{
1786 marked_ptr_t ptr = (marked_ptr_t) next;
1787
1788 ASSERT(!(ptr & N_MARK_MASK));
1789 ASSERT(!((unsigned)mark & ~N_MARK_MASK));
1790
1791 return ptr | mark;
1792}
1793
1794
1795static cht_link_t * get_next(marked_ptr_t link)
1796{
1797 return (cht_link_t*)(link & ~N_MARK_MASK);
1798}
1799
1800
1801static mark_t get_mark(marked_ptr_t link)
1802{
1803 return (mark_t)(link & N_MARK_MASK);
1804}
1805
1806
1807static void next_wnd(wnd_t *wnd)
1808{
1809 ASSERT(wnd);
1810 ASSERT(wnd->cur);
1811
1812 wnd->last = wnd->cur;
1813 wnd->ppred = &wnd->cur->link;
1814 wnd->cur = get_next(wnd->cur->link);
1815}
1816
1817
1818static bool same_node_pred(void *node, const cht_link_t *item2)
1819{
1820 const cht_link_t *item1 = (const cht_link_t*) node;
1821 return item1 == item2;
1822}
1823
1824static marked_ptr_t cas_link(marked_ptr_t *link, const cht_link_t *cur_next,
1825 mark_t cur_mark, const cht_link_t *new_next, mark_t new_mark)
1826{
1827 return _cas_link(link, make_link(cur_next, cur_mark),
1828 make_link(new_next, new_mark));
1829}
1830
1831static marked_ptr_t _cas_link(marked_ptr_t *link, marked_ptr_t cur,
1832 marked_ptr_t new)
1833{
1834 /*
1835 * cas(x) on the same location x on one cpu must be ordered, but do not
1836 * have to be ordered wrt to other cas(y) to a different location y
1837 * on the same cpu.
1838 *
1839 * cas(x) must act as a write barrier on x, ie if cas(x) succeeds
1840 * and is observed by another cpu, then all cpus must be able to
1841 * make the effects of cas(x) visible just by issuing a load barrier.
1842 * For example:
1843 * cpu1 cpu2 cpu3
1844 * cas(x, 0 -> 1), succeeds
1845 * cas(x, 0 -> 1), fails
1846 * MB
1847 * y = 7
1848 * sees y == 7
1849 * loadMB must be enough to make cas(x) on cpu3 visible to cpu1, ie x == 1.
1850 *
1851 * If cas() did not work this way:
1852 * - our head move protocol would not be correct.
1853 * - freeing an item linked to a moved head after another item was
1854 * inserted in front of it, would require more than one grace period.
1855 */
1856 void *ret = atomic_cas_ptr((void**)link, (void *)cur, (void *)new);
1857 return (marked_ptr_t) ret;
1858}
1859
1860static void cas_order_barrier(void)
1861{
1862 /* Make sure CAS to different memory locations are ordered. */
1863 write_barrier();
1864}
1865
1866
1867/** @}
1868 */
Note: See TracBrowser for help on using the repository browser.