Created
November 2, 2015 13:53
-
-
Save kumagi/d259274270fdc1385f81 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#define _GNU_SOURCE 1 | |
#include <sched.h> // sched_setaffinity | |
#include <stdint.h> | |
#include <stdio.h> | |
#include <limits.h> | |
#include <stdlib.h> | |
#include <stdint.h> | |
#include <pthread.h> | |
#include <assert.h> | |
#include <sys/time.h> | |
#include <urcu-qsbr.h> | |
#define mb() asm volatile("" : : : "memory") | |
#define mf() asm volatile("mfence" : : : "memory") | |
//#define usleep(n) | |
#define CACHELINE 64 | |
__thread size_t TID; | |
/* (setq compile-command "gcc wfstack2.c -O4 -lpthread -lurcu-qsbr -ggdb -Wall -o wfstack -DNDEBUG -flto") | |
*/ | |
typedef int Item; | |
typedef struct node { | |
Item* item; | |
volatile int winner; | |
struct node* next; | |
struct node* prev; | |
} Node __attribute__ ((aligned (CACHELINE)));; | |
static Node* new_node(Item* const i, const size_t w, Node* const n) { | |
Node* const construct = (Node*)malloc(sizeof(Node)); | |
construct->item = i; | |
construct->winner = w; | |
construct->next = n; | |
construct->prev = NULL; | |
mf(); | |
return construct; | |
} | |
typedef struct { | |
int is_push; | |
uint64_t phase; | |
int pending; | |
Item* item; | |
} State __attribute__ ((aligned (CACHELINE))); | |
typedef struct wf_stack { | |
Node* head; | |
State** states; | |
int thread_max; | |
} wf_stack __attribute__ ((aligned (CACHELINE)));; | |
static State* new_state(const uint64_t p, | |
const int pend, | |
const int push, | |
Item* const i) { | |
State* const construct = (State*)malloc(sizeof(State)); | |
construct->phase = p; | |
construct->pending = pend; | |
construct->is_push = push; | |
construct->item = i; | |
mf(); | |
return construct; | |
} | |
void print_state(const State* s) { | |
printf("{phase:%lu, %s, %s, %d}", | |
s->phase, | |
(s->pending == 1) ? "pending" : "none", | |
(s->is_push == 1) ? "push" : "pop", | |
(s->item == NULL) ? -1 : *s->item | |
); | |
fflush(stdout); | |
} | |
void stack_init(wf_stack* const stack, int threads); | |
void stack_destroy(wf_stack* const stack); | |
int is_empty(const wf_stack* const stack); | |
void push(wf_stack* const stack, const Item item); | |
int pop(wf_stack* const stack, Item* const result); | |
void dump(const wf_stack* const stack); | |
static uint64_t max_phase(const wf_stack* const stack); | |
static int scan(const wf_stack* const stack, uint64_t phase); | |
static int is_still_pending(const wf_stack* const stack, const int tid, const uint64_t phase); | |
static void help_push(wf_stack* const stack, const size_t tid, const uint64_t phase); | |
static void help_pop(wf_stack* const stack, const int tid, const uint64_t phase); | |
static void help(wf_stack* const stack, const uint64_t phase); | |
static void help_finish(wf_stack* const stack); | |
void stack_init(wf_stack* const stack, int threads) { | |
int i; | |
stack->head = new_node(NULL, 0, NULL); | |
stack->states = (State**)malloc(sizeof(State*) * (threads + 1)); | |
stack->states[0] = NULL; | |
for (i = 1; i <= threads; ++i) { | |
stack->states[i] = new_state(0, 0, 0, NULL); | |
} | |
stack->thread_max = threads; | |
mf(); | |
} | |
int is_empty(const wf_stack* const stack) { | |
Node* got_node = stack->head; | |
return got_node->item == NULL && got_node->winner == 0 && got_node->next == NULL; | |
} | |
void free_stack(State* target) { | |
if (target->is_push) { | |
free(target); | |
} else { | |
free(target); | |
} | |
} | |
void stack_destroy(wf_stack* const stack) { | |
int i; | |
Node* ptr = stack->head; | |
while (ptr) { | |
Node* const next = ptr->next; | |
free(ptr); | |
ptr = next; | |
} | |
for (i = 1; i <= stack->thread_max; ++i) { | |
free(stack->states[i]); | |
} | |
free(stack->states); | |
} | |
static uint64_t max_phase(const wf_stack* const stack) { | |
uint64_t result = 0; | |
int i; | |
for (i = 1; i <= stack->thread_max; ++i) { | |
result = result > stack->states[i]->phase ? result : stack->states[i]->phase; | |
} | |
return result; | |
} | |
static int is_still_pending(const wf_stack* const stack, | |
const int tid, | |
const uint64_t phase) { | |
return stack->states[tid]->pending && | |
stack->states[tid]->phase <= phase; | |
} | |
static int scan(const wf_stack* const stack, uint64_t phase) { | |
int i; | |
uint64_t min_phase = ~0LLU; | |
int min_phase_tid = 0; | |
for (i = 1; i <= stack->thread_max; ++i) { | |
if (stack->states[i]->phase < min_phase && | |
is_still_pending(stack, i, phase)) { | |
min_phase = stack->states[i]->phase; | |
min_phase_tid = i; | |
} | |
} | |
return min_phase_tid; | |
} | |
static void help(wf_stack* const stack, const uint64_t phase) { | |
size_t other_tid; | |
while ((other_tid = scan(stack, phase))) { | |
if (stack->states[other_tid]->is_push) { | |
help_push(stack, other_tid, stack->states[other_tid]->phase); | |
} else { | |
help_pop(stack, other_tid, stack->states[other_tid]->phase); | |
} | |
} | |
} | |
void push(wf_stack* const stack, const Item item) { | |
rcu_quiescent_state(); | |
const uint64_t phase = max_phase(stack) + 1; | |
State* const old_state = stack->states[TID]; | |
assert(!old_state->pending); | |
Item* const push_item = (Item*)malloc(sizeof(Item)); | |
mf(); | |
*push_item = item; /* duplicate */ | |
State* push_state = new_state(phase, 1, 1, push_item); | |
mf(); | |
stack->states[TID] = push_state; | |
help(stack, phase); | |
//help_finish(stack); | |
synchronize_rcu(); | |
free(old_state); | |
} | |
static void help_push(wf_stack* const stack, | |
const size_t tid, | |
const uint64_t phase) { | |
Node* new_head = new_node(stack->states[tid]->item, tid, NULL); | |
while (is_still_pending(stack, tid, phase)) { | |
rcu_quiescent_state(); | |
Node* const old_head = stack->head; | |
const int old_winner = old_head->winner; | |
if (old_head != stack->head) { | |
continue; | |
} | |
if (old_winner == 0) { | |
/* ニュートラル状態 */ | |
new_head->next = old_head; | |
if (is_still_pending(stack, tid, phase)) { | |
if (__sync_bool_compare_and_swap_8(&stack->head, | |
(uint64_t)old_head, | |
(uint64_t)new_head)) { | |
/* この瞬間に他のスレッドからnew_nodeが観測可能になる */ | |
help_finish(stack); | |
return; | |
} | |
} | |
} else { | |
help_finish(stack); | |
} | |
} | |
/* 既にpush操作が他のスレッドによって完了したので自分の分は破棄 */ | |
free(new_head); | |
} | |
int pop(wf_stack* const stack, Item* const result) { | |
rcu_quiescent_state(); | |
const uint64_t phase = max_phase(stack) + 1; | |
State* old_state = stack->states[TID]; | |
assert(!old_state->pending); | |
State* pop_state = new_state(phase, 1, 0, NULL); | |
stack->states[TID] = pop_state; | |
help(stack, phase); | |
//help_finish(stack); | |
Item* const got_item = stack->states[TID]->item; | |
if (got_item != NULL) { | |
*result = *got_item; | |
synchronize_rcu(); | |
free(old_state); | |
free(got_item); | |
return 1; | |
} else { | |
synchronize_rcu(); | |
free(old_state); | |
return 0; | |
} | |
} | |
void help_pop(wf_stack* const stack, | |
const int tid, | |
const uint64_t phase) { | |
Node* new_head = new_node(NULL, tid, NULL); | |
while (is_still_pending(stack, tid, phase)) { | |
rcu_quiescent_state(); | |
Node* const old_head = stack->head; | |
Node* const next = old_head->next; | |
int const old_winner = old_head->winner; | |
State* old_state = stack->states[tid]; | |
Item* old_item = old_state->item; | |
if (old_head != stack->head) { | |
continue; | |
} | |
if (old_winner == 0) { | |
if (is_still_pending(stack, tid, phase)) { | |
new_head->prev = old_head; | |
if (next) { | |
new_head->item = next->item; | |
new_head->next = next->next; | |
} | |
//old_state->item = old_head->item; | |
if (__sync_bool_compare_and_swap_8(&stack->head, (uint64_t)old_head, (uint64_t)new_head)) { | |
help_finish(stack); | |
synchronize_rcu(); | |
if (next) { | |
free(next); | |
} | |
free(old_head); | |
return; | |
} | |
} | |
} else { | |
help_finish(stack); | |
} | |
} | |
/* 失敗したのでごみ捨て */ | |
free(new_head); | |
} | |
void help_finish(wf_stack* const stack) { | |
Node* const head = stack->head; | |
const int winner = head->winner; | |
State* const state = stack->states[winner]; | |
/* headが変わる、ということは既にwinnerが倒されている */ | |
if (head != stack->head) { | |
return; | |
} | |
if (winner == 0) { | |
return; | |
} | |
if (!state->is_push) { | |
state->item = head->prev->item; | |
} | |
mf(); | |
state->pending = 0; | |
mf(); | |
head->winner = 0; | |
} | |
/* ---------------------- */ | |
typedef struct { | |
size_t tid; | |
int num; | |
wf_stack* stack; | |
pthread_mutex_t* regist_lock; | |
pthread_barrier_t* barrier; | |
} workingset; | |
void* work(void* data) { | |
workingset* my_ws = (workingset*)data; | |
wf_stack* stack = my_ws->stack; | |
TID = my_ws->tid; | |
cpu_set_t mask; | |
CPU_ZERO(&mask); | |
CPU_SET(TID % CORES, &mask); | |
if (sched_setaffinity(0, sizeof(mask), &mask) == -1) { | |
perror("setaffinity:"); | |
exit(1); | |
} | |
pthread_mutex_lock(my_ws->regist_lock); | |
rcu_register_thread(); | |
pthread_mutex_unlock(my_ws->regist_lock); | |
pthread_barrier_wait(my_ws->barrier); | |
assert(stack->head); | |
int i; | |
for (i = 0; i < my_ws->num; ++i) { | |
//printf("%d ", i); | |
//fflush(stdout); | |
push(stack, i); | |
} | |
//dump(stack); | |
//printf("tid %zd push done\n", my_ws->tid); | |
for (i = 0; i < my_ws->num; ++i) { | |
int result = i; | |
int success = pop(stack, &result); | |
//printf("%d ", result); | |
//fflush(stdout); | |
if (!success) { | |
printf("pop failed\n"); | |
abort(); | |
} | |
} | |
rcu_thread_offline(); | |
//printf("tid %d pop done\n", my_ws->tid); | |
pthread_barrier_wait(my_ws->barrier); | |
pthread_mutex_lock(my_ws->regist_lock); | |
rcu_unregister_thread(); | |
pthread_mutex_unlock(my_ws->regist_lock); | |
return NULL; | |
} | |
void dump(const wf_stack* const stack) { | |
// not thread safe | |
const Node* ptr = stack->head; | |
int ret = 0; | |
//printf("dump:%x\n", head); | |
while (ptr) { | |
//ret += ptr->data + ptr->winner; | |
if (ptr->item == NULL) { | |
break; | |
} | |
printf("{d:%d,w:%d,%p}->", *ptr->item, ptr->winner, ptr->next); | |
fflush(stdout); | |
ptr = ptr->next; | |
} | |
printf("%d(NULL)\n", ret); | |
} | |
double now(){ | |
struct timeval t; | |
gettimeofday(&t, NULL); | |
return (double)t.tv_sec + (double)t.tv_usec * 1e-6; | |
} | |
int main(int argc, char** argv) { | |
if (argc != 2) { | |
printf("specify thread num\n"); | |
return 1; | |
} | |
const int thread_max = atoi(argv[1]); | |
pthread_t thread[thread_max]; | |
workingset wk[thread_max]; | |
const int num = 10000; | |
wf_stack stack; | |
stack_init(&stack, thread_max); | |
pthread_mutex_t regist_lock; | |
pthread_barrier_t regist_barrier; | |
// init shared data | |
pthread_mutex_init(®ist_lock, NULL); | |
pthread_barrier_init(®ist_barrier, NULL, thread_max + 1); | |
int i; | |
for (i = 0; i < thread_max; ++i) { | |
wk[i].tid = i + 1; | |
wk[i].num = num; | |
wk[i].stack = &stack; | |
wk[i].regist_lock = ®ist_lock; | |
wk[i].barrier = ®ist_barrier; | |
mf(); | |
pthread_create(&thread[i], NULL, work, &wk[i]); | |
} | |
usleep(5000); | |
const double start = now(); | |
pthread_barrier_wait(®ist_barrier); | |
pthread_barrier_wait(®ist_barrier); | |
const double finish = now(); | |
for (i = 0; i < thread_max; ++i) { | |
pthread_join(thread[i], NULL); | |
} | |
pthread_barrier_destroy(®ist_barrier); | |
pthread_mutex_destroy(®ist_lock); | |
int result; | |
TID = 1; | |
int should_fail = pop(&stack, &result); | |
printf("stack is successfully empty:%d\n", should_fail); | |
assert(should_fail == 0); | |
//dump(&stack); | |
assert(is_empty(&stack)); | |
stack_destroy(&stack); | |
printf("push: %d pop:%d done.\n", num * thread_max, num * thread_max + 1); | |
printf("%lf query / sec\n", thread_max * num / (finish - start)); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment