comparison pthreads_kmeans.c @ 1:8e7bdab2840f

VPThread version workinh
author Merten Sach <msach@mailbox.tu-berlin.de>
date Tue, 16 Aug 2011 20:32:55 +0200
parents e69e4c2d612a
children
comparison
equal deleted inserted replaced
0:624ce82ad481 1:a5a36d33a5dd
22 #include <pthread.h> 22 #include <pthread.h>
23 #include <time.h> 23 #include <time.h>
24 #include <math.h> 24 #include <math.h>
25 #include "kmeans.h" 25 #include "kmeans.h"
26 26
27 #include "VPThread_lib/VPThread.h"
28
27 #define PREC 300 29 #define PREC 300
28 30
29 char __ProgrammName[] = "kmeans"; 31 struct barrier_t
30 char __DataSet[255]; 32 {
33 int counter;
34 int nthreads;
35 int32 mutex;
36 int32 cond;
37 };
38 typedef struct barrier_t barrier;
31 39
32 extern int nthreads; /* Thread count */ 40 extern int nthreads; /* Thread count */
33 double delta; /* Delta is a value between 0 and 1 describing the percentage of objects which changed cluster membership */ 41 double delta; /* Delta is a value between 0 and 1 describing the percentage of objects which changed cluster membership */
34 volatile int finished; 42 volatile int finished;
35 43
36 pthread_barrier_t barr; 44 barrier barr;
37 pthread_mutex_t lock1; 45 int32 lock1;
38 pthread_attr_t attr; 46 pthread_attr_t attr;
47
48 void inline barrier_init(barrier *barr, int nthreads, VirtProcr *VProc)
49 {
50 barr->counter = 0;
51 barr->nthreads = nthreads;
52 barr->mutex = VPThread__make_mutex(VProc);
53 barr->cond = VPThread__make_cond(barr->mutex, VProc);
54 }
55
56 void inline barrier_wait(barrier *barr, VirtProcr *VProc)
57 {
58 int i;
59
60 VPThread__mutex_lock(barr->mutex, VProc);
61 barr->counter++;
62 if(barr->counter == barr->nthreads)
63 {
64 barr->counter = 0;
65 for(i=0; i < barr->nthreads; i++)
66 VPThread__cond_signal(barr->cond, VProc);
67 }
68 else
69 {
70 VPThread__cond_wait(barr->cond, VProc);
71 }
72 VPThread__mutex_unlock(barr->mutex, VProc);
73 }
39 74
40 /* 75 /*
41 * Struct: input 76 * Struct: input
42 * ------------- 77 * -------------
43 * Encapsulates all the input data for the benchmark, i.e. the object list, 78 * Encapsulates all the input data for the benchmark, i.e. the object list,
91 } 126 }
92 } 127 }
93 return index; 128 return index;
94 } 129 }
95 130
96 void work(struct input *x){ 131 void work(struct input *x, VirtProcr *VProc){
97 int tid = x->t; 132 int tid = x->t;
98 double local_delta=0; 133 double local_delta=0;
99 int i; 134 int i;
100 for (i = tid; i < x->numObjs; i += nthreads) { 135 for (i = tid; i < x->numObjs; i += nthreads) {
101 /* find the array index of nearest cluster center */ 136 /* find the array index of nearest cluster center */
114 int j; 149 int j;
115 for (j=0; j < x->numCoords; j++) 150 for (j=0; j < x->numCoords; j++)
116 x->local_newClusters[tid][index][j] += x->objects[i][j]; 151 x->local_newClusters[tid][index][j] += x->objects[i][j];
117 152
118 } 153 }
119 pthread_mutex_lock(&lock1); 154 VPThread__mutex_lock(lock1, VProc);
120 delta +=local_delta; 155 delta +=local_delta;
121 pthread_mutex_unlock(&lock1); 156 VPThread__mutex_unlock(lock1, VProc);
122 } 157 }
158
123 /* 159 /*
124 * Function: thread function work 160 * Function: thread function work
125 * -------------- 161 * --------------
126 * Worker function for threading. Work distribution is done so that each thread computers 162 * Worker function for threading. Work distribution is done so that each thread computers
127 */ 163 */
128 void* tfwork(void *ip) 164 void tfwork(void *ip, VirtProcr *VProc)
129 { 165 {
130 struct input *x; 166 struct input *x;
131 x = (struct input *)ip; 167 x = (struct input *)ip;
132 168
133 for(;;){ 169 for(;;){
134 pthread_barrier_wait(&barr); 170 barrier_wait(&barr, VProc);
135 if (finished){ 171 if (finished){
136 break; 172 break;
137 } 173 }
138 work(x); 174 work(x, VProc);
139 pthread_barrier_wait(&barr); 175 barrier_wait(&barr, VProc);
140 } 176 }
141 177
142 pthread_exit(NULL); 178 //pthread_exit(NULL);
179 VPThread__dissipate_thread(VProc);
143 } 180 }
144 181
145 /* 182 /*
146 * Function: create_array_2d_f 183 * Function: create_array_2d_f
147 * -------------------------- 184 * --------------------------
148 * Allocates memory for a 2-dim double array as needed for the algorithm. 185 * Allocates memory for a 2-dim double array as needed for the algorithm.
149 */ 186 */
150 double** create_array_2d_f(int height, int width) { 187 double** create_array_2d_f(int height, int width, VirtProcr *VProc) {
151 double** ptr; 188 double** ptr;
152 int i; 189 int i;
153 ptr = calloc(height, sizeof(double*)); 190 ptr = VPThread__malloc(height * sizeof(double*), VProc);
154 assert(ptr != NULL); 191 assert(ptr != NULL);
155 ptr[0] = calloc(width * height, sizeof(double)); 192 ptr[0] = VPThread__malloc(width * height * sizeof(double), VProc);
156 assert(ptr[0] != NULL); 193 assert(ptr[0] != NULL);
157 /* Assign pointers correctly */ 194 /* Assign pointers correctly */
158 for(i = 1; i < height; i++) 195 for(i = 1; i < height; i++)
159 ptr[i] = ptr[i-1] + width; 196 ptr[i] = ptr[i-1] + width;
160 return ptr; 197 return ptr;
163 /* 200 /*
164 * Function: create_array_2Dd_i 201 * Function: create_array_2Dd_i
165 * -------------------------- 202 * --------------------------
166 * Allocates memory for a 2-dim integer array as needed for the algorithm. 203 * Allocates memory for a 2-dim integer array as needed for the algorithm.
167 */ 204 */
168 int** create_array_2d_i(int height, int width) { 205 int** create_array_2d_i(int height, int width, VirtProcr *VProc) {
169 int** ptr; 206 int** ptr;
170 int i; 207 int i;
171 ptr = calloc(height, sizeof(int*)); 208 ptr = VPThread__malloc(height * sizeof(int*), VProc);
172 assert(ptr != NULL); 209 assert(ptr != NULL);
173 ptr[0] = calloc(width * height, sizeof(int)); 210 ptr[0] = VPThread__malloc(width * height * sizeof(int), VProc);
174 assert(ptr[0] != NULL); 211 assert(ptr[0] != NULL);
175 /* Assign pointers correctly */ 212 /* Assign pointers correctly */
176 for(i = 1; i < height; i++) 213 for(i = 1; i < height; i++)
177 ptr[i] = ptr[i-1] + width; 214 ptr[i] = ptr[i-1] + width;
178 return ptr; 215 return ptr;
181 /* 218 /*
182 * Function: pthreads_kmeans 219 * Function: pthreads_kmeans
183 * ------------------------- 220 * -------------------------
184 * Algorithm main function. Returns a 2D array of cluster centers of size [numClusters][numCoords]. 221 * Algorithm main function. Returns a 2D array of cluster centers of size [numClusters][numCoords].
185 */ 222 */
186 double** pthreads_kmeans(int is_perform_atomic, /* in: */ 223 void pthreads_kmeans(void *data, VirtProcr *VProc)
187 double **objects, /* in: [numObjs][numCoords] */ 224 {
188 int numCoords, /* no. coordinates */ 225 struct call_data *cluster_data = (struct call_data*)data;
189 int numObjs, /* no. objects */ 226 //int is_perform_atomic = cluster_data->is_perform_atomic; /* in: */
190 int numClusters, /* no. clusters */ 227 double **objects = cluster_data->objects; /* in: [numObjs][numCoords] */
191 double threshold, /* % objects change membership */ 228 int numCoords = cluster_data->numCoords; /* no. coordinates */
192 int *membership) /* out: [numObjs] */ 229 int numObjs = cluster_data->numObjs; /* no. objects */
193 { 230 int numClusters = cluster_data->numClusters; /* no. clusters */
194 231 double threshold = cluster_data->threshold; /* % objects change membership */
195 int i, j, k, index, loop = 0, rc; 232 int *membership = cluster_data->membership; /* out: [numObjs] */
233
234 int i, j, k, loop = 0;
196 int *newClusterSize; /* [numClusters]: no. objects assigned in each 235 int *newClusterSize; /* [numClusters]: no. objects assigned in each
197 new cluster */ 236 new cluster */
198 double **clusters; /* out: [numClusters][numCoords] */ 237 double **clusters = cluster_data->clusters; /* out: [numClusters][numCoords] */
199 double **newClusters; /* [numClusters][numCoords] */ 238 double **newClusters; /* [numClusters][numCoords] */
200 double timing; 239 //double timing;
201 int **local_newClusterSize; /* [nthreads][numClusters] */ 240 int **local_newClusterSize; /* [nthreads][numClusters] */
202 double ***local_newClusters; /* [nthreads][numClusters][numCoords] */ 241 double ***local_newClusters; /* [nthreads][numClusters][numCoords] */
203 242
204 pthread_t *thread; 243 VirtProcr **thread;
205 244
206 /* === MEMORY SETUP === */ 245 /* === MEMORY SETUP === */
207 246
208 /* [numClusters] clusters of [numCoords] double coordinates each */ 247 /* [numClusters] clusters of [numCoords] double coordinates each */
209 clusters = create_array_2d_f(numClusters, numCoords); 248 //Set pointers
249 for(i = 1; i < numClusters; i++)
250 clusters[i] = clusters[i-1] + numCoords;
210 251
211 /* Pick first numClusters elements of objects[] as initial cluster centers */ 252 /* Pick first numClusters elements of objects[] as initial cluster centers */
212 for (i=0; i < numClusters; i++) 253 for (i=0; i < numClusters; i++)
213 for (j=0; j < numCoords; j++) 254 for (j=0; j < numCoords; j++)
214 clusters[i][j] = objects[i][j]; 255 clusters[i][j] = objects[i][j];
216 /* Initialize membership, no object belongs to any cluster yet */ 257 /* Initialize membership, no object belongs to any cluster yet */
217 for (i = 0; i < numObjs; i++) 258 for (i = 0; i < numObjs; i++)
218 membership[i] = -1; 259 membership[i] = -1;
219 260
220 /* newClusterSize holds information on the count of members in each cluster */ 261 /* newClusterSize holds information on the count of members in each cluster */
221 newClusterSize = (int*)calloc(numClusters, sizeof(int)); 262 newClusterSize = (int*)VPThread__malloc(numClusters * sizeof(int), VProc);
222 assert(newClusterSize != NULL); 263 assert(newClusterSize != NULL);
223 264
224 /* newClusters holds the coordinates of the freshly created clusters */ 265 /* newClusters holds the coordinates of the freshly created clusters */
225 newClusters = create_array_2d_f(numClusters, numCoords); 266 newClusters = create_array_2d_f(numClusters, numCoords, VProc);
226 local_newClusterSize = create_array_2d_i(nthreads, numClusters); 267 local_newClusterSize = create_array_2d_i(nthreads, numClusters, VProc);
227 268
228 /* local_newClusters is a 3D array */ 269 /* local_newClusters is a 3D array */
229 local_newClusters = (double***)malloc(nthreads * sizeof(double**)); 270 local_newClusters = (double***)VPThread__malloc(nthreads * sizeof(double**), VProc);
230 assert(local_newClusters != NULL); 271 assert(local_newClusters != NULL);
231 local_newClusters[0] = (double**) malloc(nthreads * numClusters * sizeof(double*)); 272 local_newClusters[0] = (double**) VPThread__malloc(nthreads * numClusters * sizeof(double*), VProc);
232 assert(local_newClusters[0] != NULL); 273 assert(local_newClusters[0] != NULL);
233 274
234 /* Set up the pointers */ 275 /* Set up the pointers */
235 for (i = 1; i < nthreads; i++) 276 for (i = 1; i < nthreads; i++)
236 local_newClusters[i] = local_newClusters[i-1] + numClusters; 277 local_newClusters[i] = local_newClusters[i-1] + numClusters;
237 278
238 for (i = 0; i < nthreads; i++) { 279 for (i = 0; i < nthreads; i++) {
239 for (j = 0; j < numClusters; j++) { 280 for (j = 0; j < numClusters; j++) {
240 local_newClusters[i][j] = (double*)calloc(numCoords, sizeof(double)); 281 local_newClusters[i][j] = (double*)VPThread__malloc(numCoords * sizeof(double), VProc);
241 assert(local_newClusters[i][j] != NULL); 282 assert(local_newClusters[i][j] != NULL);
242 } 283 }
243 } 284 }
244 /* Perform thread setup */ 285 /* Perform thread setup */
245 thread = (pthread_t*)calloc(nthreads, sizeof(pthread_t)); 286 thread = (VirtProcr**)VPThread__malloc(nthreads * sizeof(VirtProcr*), VProc);
246 287
247 printf("nthreads %d\n", nthreads); 288 barrier_init(&barr, nthreads, VProc);
248 pthread_barrier_init(&barr, NULL, nthreads); 289 lock1 = VPThread__make_mutex(VProc);
249 pthread_attr_init(&attr);
250 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
251 pthread_mutex_init(&lock1, NULL);
252 finished=0; 290 finished=0;
253 291
254 struct input *ip = malloc(nthreads * sizeof(struct input)); 292 struct input *ip = VPThread__malloc(nthreads * sizeof(struct input), VProc);
255 /* Provide thread-safe memory locations for each worker */ 293 /* Provide thread-safe memory locations for each worker */
256 for(i = 0; i < nthreads; i++){ 294 for(i = 0; i < nthreads; i++){
257 ip[i].t = i; 295 ip[i].t = i;
258 ip[i].objects=objects; 296 ip[i].objects=objects;
259 ip[i].clusters=clusters; 297 ip[i].clusters=clusters;
263 ip[i].numObjs=numObjs; 301 ip[i].numObjs=numObjs;
264 ip[i].numClusters=numClusters; 302 ip[i].numClusters=numClusters;
265 ip[i].numCoords=numCoords; 303 ip[i].numCoords=numCoords;
266 304
267 if (i>0){ 305 if (i>0){
268 rc = pthread_create(&thread[i], &attr, tfwork, (void *)&ip[i]); 306 thread[i] = VPThread__create_thread(tfwork, (void*)&ip[i], VProc);
269 if (rc) {
270 fprintf(stderr, "ERROR: Return Code For Thread Creation Is %d\n", rc);
271 exit(EXIT_FAILURE);
272 }
273 } 307 }
274 } 308 }
275 309
276 /* === COMPUTATIONAL PHASE === */ 310 /* === COMPUTATIONAL PHASE === */
277 311
278 do { 312 do {
279 delta = 0.0; 313 delta = 0.0;
280 pthread_barrier_wait(&barr); 314 barrier_wait(&barr, VProc);
281 work(&ip[0]); 315 work(&ip[0], VProc);
282 316
283 pthread_barrier_wait(&barr); 317 barrier_wait(&barr, VProc);
284 /* Let the main thread perform the array reduction */ 318 /* Let the main thread perform the array reduction */
285 for (i = 0; i < numClusters; i++) { 319 for (i = 0; i < numClusters; i++) {
286 for (j = 0; j < nthreads; j++) { 320 for (j = 0; j < nthreads; j++) {
287 newClusterSize[i] += local_newClusterSize[j][i]; 321 newClusterSize[i] += local_newClusterSize[j][i];
288 local_newClusterSize[j][i] = 0.0; 322 local_newClusterSize[j][i] = 0.0;
304 } 338 }
305 339
306 delta /= numObjs; 340 delta /= numObjs;
307 } while (loop++ < PREC && delta > threshold); 341 } while (loop++ < PREC && delta > threshold);
308 342
309 // Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program, 343 // Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program,
310 // but minor double precision floating point inaccuracies caused by threading would otherwise lead to huge differences in computed 344 // but minor double precision floating point inaccuracies caused by threading would otherwise lead to huge differences in computed
311 // iterations, therefore making benchmarking completely unreliable. 345 // iterations, therefore making benchmarking completely unreliable.
312 346
313 finished=1; 347 finished=1;
314 pthread_barrier_wait(&barr); 348 barrier_wait(&barr, VProc);
315 349
316 for(i = 1; i < nthreads; i++) { 350
317 rc = pthread_join(thread[i], NULL); 351 VPThread__free(ip, VProc);
318 if (rc) { 352 VPThread__free(thread, VProc);
319 fprintf(stderr, "ERROR: Return Code For Thread Join Is %d\n", rc); 353
320 exit(EXIT_FAILURE); 354 VPThread__free(local_newClusterSize[0], VProc);
321 } 355 VPThread__free(local_newClusterSize, VProc);
322 }
323
324 free(ip);
325 free(thread);
326 pthread_barrier_destroy(&barr);
327 pthread_mutex_destroy(&lock1);
328 pthread_attr_destroy(&attr);
329
330 free(local_newClusterSize[0]);
331 free(local_newClusterSize);
332 356
333 for (i = 0; i < nthreads; i++) 357 for (i = 0; i < nthreads; i++)
334 for (j = 0; j < numClusters; j++) 358 for (j = 0; j < numClusters; j++)
335 free(local_newClusters[i][j]); 359 VPThread__free(local_newClusters[i][j], VProc);
336 free(local_newClusters[0]); 360 VPThread__free(local_newClusters[0], VProc);
337 free(local_newClusters); 361 VPThread__free(local_newClusters, VProc);
338 362
339 free(newClusters[0]); 363 VPThread__free(newClusters[0], VProc);
340 free(newClusters); 364 VPThread__free(newClusters, VProc);
341 free(newClusterSize); 365 VPThread__free(newClusterSize, VProc);
342 return clusters; 366
343 } 367 (cluster_data)->clusters = clusters;
344 368
369 VPThread__dissipate_thread(VProc);
370 }
371