Mercurial > cgi-bin > hgwebdir.cgi > PR > Applications > Vthread > Vthread__KMeans__Bench
comparison pthreads_kmeans.c @ 1:8e7bdab2840f
VPThread version workinh
| author | Merten Sach <msach@mailbox.tu-berlin.de> |
|---|---|
| date | Tue, 16 Aug 2011 20:32:55 +0200 |
| parents | e69e4c2d612a |
| children |
comparison
equal
deleted
inserted
replaced
| 0:624ce82ad481 | 1:a5a36d33a5dd |
|---|---|
| 22 #include <pthread.h> | 22 #include <pthread.h> |
| 23 #include <time.h> | 23 #include <time.h> |
| 24 #include <math.h> | 24 #include <math.h> |
| 25 #include "kmeans.h" | 25 #include "kmeans.h" |
| 26 | 26 |
| 27 #include "VPThread_lib/VPThread.h" | |
| 28 | |
| 27 #define PREC 300 | 29 #define PREC 300 |
| 28 | 30 |
| 29 char __ProgrammName[] = "kmeans"; | 31 struct barrier_t |
| 30 char __DataSet[255]; | 32 { |
| 33 int counter; | |
| 34 int nthreads; | |
| 35 int32 mutex; | |
| 36 int32 cond; | |
| 37 }; | |
| 38 typedef struct barrier_t barrier; | |
| 31 | 39 |
| 32 extern int nthreads; /* Thread count */ | 40 extern int nthreads; /* Thread count */ |
| 33 double delta; /* Delta is a value between 0 and 1 describing the percentage of objects which changed cluster membership */ | 41 double delta; /* Delta is a value between 0 and 1 describing the percentage of objects which changed cluster membership */ |
| 34 volatile int finished; | 42 volatile int finished; |
| 35 | 43 |
| 36 pthread_barrier_t barr; | 44 barrier barr; |
| 37 pthread_mutex_t lock1; | 45 int32 lock1; |
| 38 pthread_attr_t attr; | 46 pthread_attr_t attr; |
| 47 | |
| 48 void inline barrier_init(barrier *barr, int nthreads, VirtProcr *VProc) | |
| 49 { | |
| 50 barr->counter = 0; | |
| 51 barr->nthreads = nthreads; | |
| 52 barr->mutex = VPThread__make_mutex(VProc); | |
| 53 barr->cond = VPThread__make_cond(barr->mutex, VProc); | |
| 54 } | |
| 55 | |
| 56 void inline barrier_wait(barrier *barr, VirtProcr *VProc) | |
| 57 { | |
| 58 int i; | |
| 59 | |
| 60 VPThread__mutex_lock(barr->mutex, VProc); | |
| 61 barr->counter++; | |
| 62 if(barr->counter == barr->nthreads) | |
| 63 { | |
| 64 barr->counter = 0; | |
| 65 for(i=0; i < barr->nthreads; i++) | |
| 66 VPThread__cond_signal(barr->cond, VProc); | |
| 67 } | |
| 68 else | |
| 69 { | |
| 70 VPThread__cond_wait(barr->cond, VProc); | |
| 71 } | |
| 72 VPThread__mutex_unlock(barr->mutex, VProc); | |
| 73 } | |
| 39 | 74 |
| 40 /* | 75 /* |
| 41 * Struct: input | 76 * Struct: input |
| 42 * ------------- | 77 * ------------- |
| 43 * Encapsulates all the input data for the benchmark, i.e. the object list, | 78 * Encapsulates all the input data for the benchmark, i.e. the object list, |
| 91 } | 126 } |
| 92 } | 127 } |
| 93 return index; | 128 return index; |
| 94 } | 129 } |
| 95 | 130 |
| 96 void work(struct input *x){ | 131 void work(struct input *x, VirtProcr *VProc){ |
| 97 int tid = x->t; | 132 int tid = x->t; |
| 98 double local_delta=0; | 133 double local_delta=0; |
| 99 int i; | 134 int i; |
| 100 for (i = tid; i < x->numObjs; i += nthreads) { | 135 for (i = tid; i < x->numObjs; i += nthreads) { |
| 101 /* find the array index of nearest cluster center */ | 136 /* find the array index of nearest cluster center */ |
| 114 int j; | 149 int j; |
| 115 for (j=0; j < x->numCoords; j++) | 150 for (j=0; j < x->numCoords; j++) |
| 116 x->local_newClusters[tid][index][j] += x->objects[i][j]; | 151 x->local_newClusters[tid][index][j] += x->objects[i][j]; |
| 117 | 152 |
| 118 } | 153 } |
| 119 pthread_mutex_lock(&lock1); | 154 VPThread__mutex_lock(lock1, VProc); |
| 120 delta +=local_delta; | 155 delta +=local_delta; |
| 121 pthread_mutex_unlock(&lock1); | 156 VPThread__mutex_unlock(lock1, VProc); |
| 122 } | 157 } |
| 158 | |
| 123 /* | 159 /* |
| 124 * Function: thread function work | 160 * Function: thread function work |
| 125 * -------------- | 161 * -------------- |
| 126 * Worker function for threading. Work distribution is done so that each thread computers | 162 * Worker function for threading. Work distribution is done so that each thread computers |
| 127 */ | 163 */ |
| 128 void* tfwork(void *ip) | 164 void tfwork(void *ip, VirtProcr *VProc) |
| 129 { | 165 { |
| 130 struct input *x; | 166 struct input *x; |
| 131 x = (struct input *)ip; | 167 x = (struct input *)ip; |
| 132 | 168 |
| 133 for(;;){ | 169 for(;;){ |
| 134 pthread_barrier_wait(&barr); | 170 barrier_wait(&barr, VProc); |
| 135 if (finished){ | 171 if (finished){ |
| 136 break; | 172 break; |
| 137 } | 173 } |
| 138 work(x); | 174 work(x, VProc); |
| 139 pthread_barrier_wait(&barr); | 175 barrier_wait(&barr, VProc); |
| 140 } | 176 } |
| 141 | 177 |
| 142 pthread_exit(NULL); | 178 //pthread_exit(NULL); |
| 179 VPThread__dissipate_thread(VProc); | |
| 143 } | 180 } |
| 144 | 181 |
| 145 /* | 182 /* |
| 146 * Function: create_array_2d_f | 183 * Function: create_array_2d_f |
| 147 * -------------------------- | 184 * -------------------------- |
| 148 * Allocates memory for a 2-dim double array as needed for the algorithm. | 185 * Allocates memory for a 2-dim double array as needed for the algorithm. |
| 149 */ | 186 */ |
| 150 double** create_array_2d_f(int height, int width) { | 187 double** create_array_2d_f(int height, int width, VirtProcr *VProc) { |
| 151 double** ptr; | 188 double** ptr; |
| 152 int i; | 189 int i; |
| 153 ptr = calloc(height, sizeof(double*)); | 190 ptr = VPThread__malloc(height * sizeof(double*), VProc); |
| 154 assert(ptr != NULL); | 191 assert(ptr != NULL); |
| 155 ptr[0] = calloc(width * height, sizeof(double)); | 192 ptr[0] = VPThread__malloc(width * height * sizeof(double), VProc); |
| 156 assert(ptr[0] != NULL); | 193 assert(ptr[0] != NULL); |
| 157 /* Assign pointers correctly */ | 194 /* Assign pointers correctly */ |
| 158 for(i = 1; i < height; i++) | 195 for(i = 1; i < height; i++) |
| 159 ptr[i] = ptr[i-1] + width; | 196 ptr[i] = ptr[i-1] + width; |
| 160 return ptr; | 197 return ptr; |
| 163 /* | 200 /* |
| 164 * Function: create_array_2Dd_i | 201 * Function: create_array_2Dd_i |
| 165 * -------------------------- | 202 * -------------------------- |
| 166 * Allocates memory for a 2-dim integer array as needed for the algorithm. | 203 * Allocates memory for a 2-dim integer array as needed for the algorithm. |
| 167 */ | 204 */ |
| 168 int** create_array_2d_i(int height, int width) { | 205 int** create_array_2d_i(int height, int width, VirtProcr *VProc) { |
| 169 int** ptr; | 206 int** ptr; |
| 170 int i; | 207 int i; |
| 171 ptr = calloc(height, sizeof(int*)); | 208 ptr = VPThread__malloc(height * sizeof(int*), VProc); |
| 172 assert(ptr != NULL); | 209 assert(ptr != NULL); |
| 173 ptr[0] = calloc(width * height, sizeof(int)); | 210 ptr[0] = VPThread__malloc(width * height * sizeof(int), VProc); |
| 174 assert(ptr[0] != NULL); | 211 assert(ptr[0] != NULL); |
| 175 /* Assign pointers correctly */ | 212 /* Assign pointers correctly */ |
| 176 for(i = 1; i < height; i++) | 213 for(i = 1; i < height; i++) |
| 177 ptr[i] = ptr[i-1] + width; | 214 ptr[i] = ptr[i-1] + width; |
| 178 return ptr; | 215 return ptr; |
| 181 /* | 218 /* |
| 182 * Function: pthreads_kmeans | 219 * Function: pthreads_kmeans |
| 183 * ------------------------- | 220 * ------------------------- |
| 184 * Algorithm main function. Returns a 2D array of cluster centers of size [numClusters][numCoords]. | 221 * Algorithm main function. Returns a 2D array of cluster centers of size [numClusters][numCoords]. |
| 185 */ | 222 */ |
| 186 double** pthreads_kmeans(int is_perform_atomic, /* in: */ | 223 void pthreads_kmeans(void *data, VirtProcr *VProc) |
| 187 double **objects, /* in: [numObjs][numCoords] */ | 224 { |
| 188 int numCoords, /* no. coordinates */ | 225 struct call_data *cluster_data = (struct call_data*)data; |
| 189 int numObjs, /* no. objects */ | 226 //int is_perform_atomic = cluster_data->is_perform_atomic; /* in: */ |
| 190 int numClusters, /* no. clusters */ | 227 double **objects = cluster_data->objects; /* in: [numObjs][numCoords] */ |
| 191 double threshold, /* % objects change membership */ | 228 int numCoords = cluster_data->numCoords; /* no. coordinates */ |
| 192 int *membership) /* out: [numObjs] */ | 229 int numObjs = cluster_data->numObjs; /* no. objects */ |
| 193 { | 230 int numClusters = cluster_data->numClusters; /* no. clusters */ |
| 194 | 231 double threshold = cluster_data->threshold; /* % objects change membership */ |
| 195 int i, j, k, index, loop = 0, rc; | 232 int *membership = cluster_data->membership; /* out: [numObjs] */ |
| 233 | |
| 234 int i, j, k, loop = 0; | |
| 196 int *newClusterSize; /* [numClusters]: no. objects assigned in each | 235 int *newClusterSize; /* [numClusters]: no. objects assigned in each |
| 197 new cluster */ | 236 new cluster */ |
| 198 double **clusters; /* out: [numClusters][numCoords] */ | 237 double **clusters = cluster_data->clusters; /* out: [numClusters][numCoords] */ |
| 199 double **newClusters; /* [numClusters][numCoords] */ | 238 double **newClusters; /* [numClusters][numCoords] */ |
| 200 double timing; | 239 //double timing; |
| 201 int **local_newClusterSize; /* [nthreads][numClusters] */ | 240 int **local_newClusterSize; /* [nthreads][numClusters] */ |
| 202 double ***local_newClusters; /* [nthreads][numClusters][numCoords] */ | 241 double ***local_newClusters; /* [nthreads][numClusters][numCoords] */ |
| 203 | 242 |
| 204 pthread_t *thread; | 243 VirtProcr **thread; |
| 205 | 244 |
| 206 /* === MEMORY SETUP === */ | 245 /* === MEMORY SETUP === */ |
| 207 | 246 |
| 208 /* [numClusters] clusters of [numCoords] double coordinates each */ | 247 /* [numClusters] clusters of [numCoords] double coordinates each */ |
| 209 clusters = create_array_2d_f(numClusters, numCoords); | 248 //Set pointers |
| 249 for(i = 1; i < numClusters; i++) | |
| 250 clusters[i] = clusters[i-1] + numCoords; | |
| 210 | 251 |
| 211 /* Pick first numClusters elements of objects[] as initial cluster centers */ | 252 /* Pick first numClusters elements of objects[] as initial cluster centers */ |
| 212 for (i=0; i < numClusters; i++) | 253 for (i=0; i < numClusters; i++) |
| 213 for (j=0; j < numCoords; j++) | 254 for (j=0; j < numCoords; j++) |
| 214 clusters[i][j] = objects[i][j]; | 255 clusters[i][j] = objects[i][j]; |
| 216 /* Initialize membership, no object belongs to any cluster yet */ | 257 /* Initialize membership, no object belongs to any cluster yet */ |
| 217 for (i = 0; i < numObjs; i++) | 258 for (i = 0; i < numObjs; i++) |
| 218 membership[i] = -1; | 259 membership[i] = -1; |
| 219 | 260 |
| 220 /* newClusterSize holds information on the count of members in each cluster */ | 261 /* newClusterSize holds information on the count of members in each cluster */ |
| 221 newClusterSize = (int*)calloc(numClusters, sizeof(int)); | 262 newClusterSize = (int*)VPThread__malloc(numClusters * sizeof(int), VProc); |
| 222 assert(newClusterSize != NULL); | 263 assert(newClusterSize != NULL); |
| 223 | 264 |
| 224 /* newClusters holds the coordinates of the freshly created clusters */ | 265 /* newClusters holds the coordinates of the freshly created clusters */ |
| 225 newClusters = create_array_2d_f(numClusters, numCoords); | 266 newClusters = create_array_2d_f(numClusters, numCoords, VProc); |
| 226 local_newClusterSize = create_array_2d_i(nthreads, numClusters); | 267 local_newClusterSize = create_array_2d_i(nthreads, numClusters, VProc); |
| 227 | 268 |
| 228 /* local_newClusters is a 3D array */ | 269 /* local_newClusters is a 3D array */ |
| 229 local_newClusters = (double***)malloc(nthreads * sizeof(double**)); | 270 local_newClusters = (double***)VPThread__malloc(nthreads * sizeof(double**), VProc); |
| 230 assert(local_newClusters != NULL); | 271 assert(local_newClusters != NULL); |
| 231 local_newClusters[0] = (double**) malloc(nthreads * numClusters * sizeof(double*)); | 272 local_newClusters[0] = (double**) VPThread__malloc(nthreads * numClusters * sizeof(double*), VProc); |
| 232 assert(local_newClusters[0] != NULL); | 273 assert(local_newClusters[0] != NULL); |
| 233 | 274 |
| 234 /* Set up the pointers */ | 275 /* Set up the pointers */ |
| 235 for (i = 1; i < nthreads; i++) | 276 for (i = 1; i < nthreads; i++) |
| 236 local_newClusters[i] = local_newClusters[i-1] + numClusters; | 277 local_newClusters[i] = local_newClusters[i-1] + numClusters; |
| 237 | 278 |
| 238 for (i = 0; i < nthreads; i++) { | 279 for (i = 0; i < nthreads; i++) { |
| 239 for (j = 0; j < numClusters; j++) { | 280 for (j = 0; j < numClusters; j++) { |
| 240 local_newClusters[i][j] = (double*)calloc(numCoords, sizeof(double)); | 281 local_newClusters[i][j] = (double*)VPThread__malloc(numCoords * sizeof(double), VProc); |
| 241 assert(local_newClusters[i][j] != NULL); | 282 assert(local_newClusters[i][j] != NULL); |
| 242 } | 283 } |
| 243 } | 284 } |
| 244 /* Perform thread setup */ | 285 /* Perform thread setup */ |
| 245 thread = (pthread_t*)calloc(nthreads, sizeof(pthread_t)); | 286 thread = (VirtProcr**)VPThread__malloc(nthreads * sizeof(VirtProcr*), VProc); |
| 246 | 287 |
| 247 printf("nthreads %d\n", nthreads); | 288 barrier_init(&barr, nthreads, VProc); |
| 248 pthread_barrier_init(&barr, NULL, nthreads); | 289 lock1 = VPThread__make_mutex(VProc); |
| 249 pthread_attr_init(&attr); | |
| 250 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); | |
| 251 pthread_mutex_init(&lock1, NULL); | |
| 252 finished=0; | 290 finished=0; |
| 253 | 291 |
| 254 struct input *ip = malloc(nthreads * sizeof(struct input)); | 292 struct input *ip = VPThread__malloc(nthreads * sizeof(struct input), VProc); |
| 255 /* Provide thread-safe memory locations for each worker */ | 293 /* Provide thread-safe memory locations for each worker */ |
| 256 for(i = 0; i < nthreads; i++){ | 294 for(i = 0; i < nthreads; i++){ |
| 257 ip[i].t = i; | 295 ip[i].t = i; |
| 258 ip[i].objects=objects; | 296 ip[i].objects=objects; |
| 259 ip[i].clusters=clusters; | 297 ip[i].clusters=clusters; |
| 263 ip[i].numObjs=numObjs; | 301 ip[i].numObjs=numObjs; |
| 264 ip[i].numClusters=numClusters; | 302 ip[i].numClusters=numClusters; |
| 265 ip[i].numCoords=numCoords; | 303 ip[i].numCoords=numCoords; |
| 266 | 304 |
| 267 if (i>0){ | 305 if (i>0){ |
| 268 rc = pthread_create(&thread[i], &attr, tfwork, (void *)&ip[i]); | 306 thread[i] = VPThread__create_thread(tfwork, (void*)&ip[i], VProc); |
| 269 if (rc) { | |
| 270 fprintf(stderr, "ERROR: Return Code For Thread Creation Is %d\n", rc); | |
| 271 exit(EXIT_FAILURE); | |
| 272 } | |
| 273 } | 307 } |
| 274 } | 308 } |
| 275 | 309 |
| 276 /* === COMPUTATIONAL PHASE === */ | 310 /* === COMPUTATIONAL PHASE === */ |
| 277 | 311 |
| 278 do { | 312 do { |
| 279 delta = 0.0; | 313 delta = 0.0; |
| 280 pthread_barrier_wait(&barr); | 314 barrier_wait(&barr, VProc); |
| 281 work(&ip[0]); | 315 work(&ip[0], VProc); |
| 282 | 316 |
| 283 pthread_barrier_wait(&barr); | 317 barrier_wait(&barr, VProc); |
| 284 /* Let the main thread perform the array reduction */ | 318 /* Let the main thread perform the array reduction */ |
| 285 for (i = 0; i < numClusters; i++) { | 319 for (i = 0; i < numClusters; i++) { |
| 286 for (j = 0; j < nthreads; j++) { | 320 for (j = 0; j < nthreads; j++) { |
| 287 newClusterSize[i] += local_newClusterSize[j][i]; | 321 newClusterSize[i] += local_newClusterSize[j][i]; |
| 288 local_newClusterSize[j][i] = 0.0; | 322 local_newClusterSize[j][i] = 0.0; |
| 304 } | 338 } |
| 305 | 339 |
| 306 delta /= numObjs; | 340 delta /= numObjs; |
| 307 } while (loop++ < PREC && delta > threshold); | 341 } while (loop++ < PREC && delta > threshold); |
| 308 | 342 |
| 309 // Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program, | 343 // Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program, |
| 310 // but minor double precision floating point inaccuracies caused by threading would otherwise lead to huge differences in computed | 344 // but minor double precision floating point inaccuracies caused by threading would otherwise lead to huge differences in computed |
| 311 // iterations, therefore making benchmarking completely unreliable. | 345 // iterations, therefore making benchmarking completely unreliable. |
| 312 | 346 |
| 313 finished=1; | 347 finished=1; |
| 314 pthread_barrier_wait(&barr); | 348 barrier_wait(&barr, VProc); |
| 315 | 349 |
| 316 for(i = 1; i < nthreads; i++) { | 350 |
| 317 rc = pthread_join(thread[i], NULL); | 351 VPThread__free(ip, VProc); |
| 318 if (rc) { | 352 VPThread__free(thread, VProc); |
| 319 fprintf(stderr, "ERROR: Return Code For Thread Join Is %d\n", rc); | 353 |
| 320 exit(EXIT_FAILURE); | 354 VPThread__free(local_newClusterSize[0], VProc); |
| 321 } | 355 VPThread__free(local_newClusterSize, VProc); |
| 322 } | |
| 323 | |
| 324 free(ip); | |
| 325 free(thread); | |
| 326 pthread_barrier_destroy(&barr); | |
| 327 pthread_mutex_destroy(&lock1); | |
| 328 pthread_attr_destroy(&attr); | |
| 329 | |
| 330 free(local_newClusterSize[0]); | |
| 331 free(local_newClusterSize); | |
| 332 | 356 |
| 333 for (i = 0; i < nthreads; i++) | 357 for (i = 0; i < nthreads; i++) |
| 334 for (j = 0; j < numClusters; j++) | 358 for (j = 0; j < numClusters; j++) |
| 335 free(local_newClusters[i][j]); | 359 VPThread__free(local_newClusters[i][j], VProc); |
| 336 free(local_newClusters[0]); | 360 VPThread__free(local_newClusters[0], VProc); |
| 337 free(local_newClusters); | 361 VPThread__free(local_newClusters, VProc); |
| 338 | 362 |
| 339 free(newClusters[0]); | 363 VPThread__free(newClusters[0], VProc); |
| 340 free(newClusters); | 364 VPThread__free(newClusters, VProc); |
| 341 free(newClusterSize); | 365 VPThread__free(newClusterSize, VProc); |
| 342 return clusters; | 366 |
| 343 } | 367 (cluster_data)->clusters = clusters; |
| 344 | 368 |
| 369 VPThread__dissipate_thread(VProc); | |
| 370 } | |
| 371 |
