forked from lh3/minimap2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
kthread.cu
230 lines (200 loc) · 7.84 KB
/
kthread.cu
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <limits.h>
#include <stdint.h>
#include "kthread.cuh"
#include "map.cuh"
#include <cuda.h>
#if (defined(WIN32) || defined(_WIN32)) && defined(_MSC_VER)
#define __sync_fetch_and_add(ptr, addend) _InterlockedExchangeAdd((void*)ptr, addend)
#endif
/************
* kt_for() *
************/
struct kt_for_t;
typedef struct {
struct kt_for_t *t;
long i;
} ktf_worker_t;
typedef struct kt_for_t {
int n_threads;
long n;
ktf_worker_t *w;
void (*func)(void*,long,int);
void *data;
} kt_for_t;
static inline long steal_work(kt_for_t *t)
{
int i, min_i = -1;
long k, min = LONG_MAX;
for (i = 0; i < t->n_threads; ++i)
if (min > t->w[i].i) min = t->w[i].i, min_i = i;
k = __sync_fetch_and_add(&t->w[min_i].i, t->n_threads);
return k >= t->n? -1 : k;
}
static void *ktf_worker(void *data)
{
ktf_worker_t *w = (ktf_worker_t*)data;
long i;
for (;;) {
i = __sync_fetch_and_add(&w->i, w->t->n_threads);
if (i >= w->t->n) break;
w->t->func(w->t->data, i, w - w->t->w);
}
while ((i = steal_work(w->t)) >= 0)
w->t->func(w->t->data, i, w - w->t->w);
pthread_exit(0);
}
void kt_for(int n_threads, void (*func)(void*,long,int), void *data, long n)
{
if (n_threads > 1) {
int i;
kt_for_t t;
pthread_t *tid;
t.func = func, t.data = data, t.n_threads = n_threads, t.n = n;
t.w = (ktf_worker_t*)calloc(n_threads, sizeof(ktf_worker_t));
tid = (pthread_t*)calloc(n_threads, sizeof(pthread_t));
for (i = 0; i < n_threads; ++i)
t.w[i].t = &t, t.w[i].i = i;
for (i = 0; i < n_threads; ++i) pthread_create(&tid[i], 0, ktf_worker, &t.w[i]);
for (i = 0; i < n_threads; ++i) pthread_join(tid[i], 0);
free(tid); free(t.w);
} else {
long j;
for (j = 0; j < n; ++j) func(data, j, 0);
}
}
/*****************
* kt_pipeline() *
*****************/
//uses several threads, look into pthread.c
static void *ktp_worker(void *data)
{
ktp_worker_t *w = (ktp_worker_t*)data; //creates a worker given data
ktp_t *p = w->pl; //grabs the ktp struct from the worker
while (w->step < p->n_steps) { // similar to if(n<N) from CUDA
// test whether we can kick off the job with this worker
pthread_mutex_lock(&p->mutex); //mutex is a synchronization lock to protect multiple thread access
for (;;) {
int i;
// test whether another worker is doing the same step
for (i = 0; i < p->n_workers; ++i) {
if (w == &p->workers[i]) continue; // ignore itself
if (p->workers[i].step <= w->step && p->workers[i].index < w->index)//if a different worker has a step less than or equal to ours, and their index is less than ours
break;
}
if (i == p->n_workers) break; // no workers with smaller indices are doing w->step or the previous steps
pthread_cond_wait(&p->cv, &p->mutex); //seems like it's similar to syncthreads
}
pthread_mutex_unlock(&p->mutex);
// working on w->step
//Consider editing this function to be a CUDA __device__ function?
w->data = p->func(p->shared, w->step, w->step? w->data : 0); // for the first step, input is NULL
//the data of the worker is done by using the function on shared data, for a specific step, and the step value seems to be null at first? I think this is because the
//data has yet to be filled, and step exists the final value in the function is NULL. For future steps, data won't be null.
// update step and let other workers know
pthread_mutex_lock(&p->mutex);
w->step = w->step == p->n_steps - 1 || w->data? (w->step + 1) % p->n_steps : p->n_steps;
//step = 1 if it's one less than total n_steps, second part is confusing but the ternary is saying
//if the data in the worker exists, increment step by one modulo 3, if data doesn't exist, 3
//I have no idea what this has to do with the OR statement
if (w->step == 0) w->index = p->index++; //if the step is 0, the worker index is set to the struct index + 1, which I believe means it fails the while loop next time around
pthread_cond_broadcast(&p->cv);
pthread_mutex_unlock(&p->mutex);
}
pthread_exit(0);
}
//Data = a worker struct
//N = number of total workers
__global__ void cuda_worker(int N, ktp_worker_t** workers, void *shared_data)
{
extern __shared__ float s_tmp[];
float *shared = s_tmp;
fprintf(stderr, "output");
int t = threadIdx.x;
int b = blockIdx.x;
int B = blockDim.x;
int n = t + b*B;
if(n<N){
ktp_worker_t *w = workers[n];
ktp_t *p = w->pl;
if(w->step == 0){
w->data = p->func(p->shared, w->step, w->step? w->data : 0);
}
__syncthreads();
w->step = w->step+1;
if(w->step == 1){
w->data = p->func(p->shared, w->step, w->step? w->data : 0);
}
__syncthreads();
w->step = w->step+1;
if(w->step == 2){
w->data = p->func(p->shared, w->step, w->step? w->data : 0);
}
__syncthreads();
}
}
typedef void* (*func)(void* shared, int step, void* in);
__device__ func c_func = worker_caller;
void cuda_pipeline(int N, void *(*func)(void*, int, void*), void *shared_data, int n_steps)
{
ktp_t aux;
void *c_shared;
int i;
if (N<1){N = 1;}
aux.n_workers = N; //one worker per thread
aux.n_steps = n_steps; //number of steps
aux.func = func; //the function given
aux.shared = shared_data; //maybe the data given to this pipeline to work on?
aux.index = 0;
aux.workers = (ktp_worker_t*)calloc(N, sizeof(ktp_worker_t)); //creates a worker for each thread and callocs an array
for (i = 0; i < N; ++i) { //assigning index values to each worker
ktp_worker_t *w = &aux.workers[i];
w->step = 0; w->pl = &aux; w->data = 0; //w->pl allows the worker struct to reference the struct it's contained inside
w->index = aux.index++; //increments the index each time through
}
cudaMalloc(&aux.c_workers, N*sizeof(ktp_worker_t));
cudaMalloc(&c_shared, sizeof(shared_data));
cudaMemcpy(aux.c_workers, aux.workers, N*sizeof(ktp_worker_t),cudaMemcpyHostToDevice);
cudaMemcpy(c_shared, aux.shared, sizeof(shared_data), cudaMemcpyHostToDevice);
cudaMemcpyFromSymbol(&func, c_func, sizeof(func));
int B = 64;
int G = (N+B-1)/B;
cuda_worker<<<G, B>>>(N, &aux.c_workers, c_shared);
cudaMemcpy(aux.workers, aux.c_workers, N*sizeof(ktp_worker_t),cudaMemcpyDeviceToHost);
cudaMemcpy(aux.shared, aux.c_shared, N*sizeof(shared_data), cudaMemcpyDeviceToHost);
cudaFree(&aux.c_workers);
free(aux.workers);
}
// Creates workers that then work together on doing the function defined by the worker_pipeline function
//n_threads = number of threads
//func = the function each worker will carry out
//shared_data = the pipeline struct pointer that is sent to the function
//n_steps = the number of steps for each thread
void kt_pipeline(int n_threads, void *(*func)(void*, int, void*), void *shared_data, int n_steps)
{
ktp_t aux;
pthread_t *tid;
int i;
if (n_threads < 1) n_threads = 1;
aux.n_workers = n_threads; //one worker per thread
aux.n_steps = n_steps; //number of steps
aux.func = func; //the function given
aux.shared = shared_data; //maybe the data given to this pipeline to work on?
aux.index = 0;
pthread_mutex_init(&aux.mutex, 0);
pthread_cond_init(&aux.cv, 0);
aux.workers = (ktp_worker_t*)calloc(n_threads, sizeof(ktp_worker_t)); //creates a worker for each thread and callocs an array
for (i = 0; i < n_threads; ++i) { //assigning index values to each worker
ktp_worker_t *w = &aux.workers[i];
w->step = 0; w->pl = &aux; w->data = 0; //w->pl allows the worker struct to reference the struct it's contained inside
w->index = aux.index++; //increments the index each time through
}
tid = (pthread_t*)calloc(n_threads, sizeof(pthread_t)); //tid is storing n thread objects
for (i = 0; i < n_threads; ++i) pthread_create(&tid[i], 0, ktp_worker, &aux.workers[i]); //creating a thread with the index's thread object, that does the function "ktp_worker", using a specific worker
for (i = 0; i < n_threads; ++i) pthread_join(tid[i], 0); //gets the returned values from the threads
free(tid); free(aux.workers);
pthread_mutex_destroy(&aux.mutex);
pthread_cond_destroy(&aux.cv);
}