Mercurial > cgi-bin > hgwebdir.cgi > PR > Applications > Vthread > Vthread__KMeans__Bench
comparison pthreads_kmeans.c @ 0:e69e4c2d612a
Initial pthreads version
| author | Merten Sach <msach@mailbox.tu-berlin.de> |
|---|---|
| date | Wed, 03 Aug 2011 19:30:34 +0200 |
| parents | |
| children | 8e7bdab2840f |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:624ce82ad481 |
|---|---|
| 1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ | |
| 2 /* File: pthreads_kmeans.c (OpenMP version) */ | |
| 3 /* Description: Implementation of simple k-means clustering algorithm */ | |
| 4 /* This program takes an array of N data objects, each with */ | |
| 5 /* M coordinates and performs a k-means clustering given a */ | |
| 6 /* user-provided value of the number of clusters (K). The */ | |
| 7 /* clustering results are saved in 2 arrays: */ | |
| 8 /* 1. a returned array of size [K][N] indicating the center */ | |
| 9 /* coordinates of K clusters */ | |
| 10 /* 2. membership[N] stores the cluster center ids, each */ | |
| 11 /* corresponding to the cluster a data object is assigned */ | |
| 12 /* */ | |
| 13 /* Author: Wei-keng Liao */ | |
| 14 /* ECE Department, Northwestern University */ | |
| 15 /* email: wkliao@ece.northwestern.edu */ | |
| 16 /* Copyright, 2005, Wei-keng Liao */ | |
| 17 /* */ | |
| 18 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ | |
| 19 | |
| 20 #include <stdio.h> | |
| 21 #include <stdlib.h> | |
| 22 #include <pthread.h> | |
| 23 #include <time.h> | |
| 24 #include <math.h> | |
| 25 #include "kmeans.h" | |
| 26 | |
| 27 #define PREC 300 | |
| 28 | |
| 29 char __ProgrammName[] = "kmeans"; | |
| 30 char __DataSet[255]; | |
| 31 | |
| 32 extern int nthreads; /* Thread count */ | |
| 33 double delta; /* Delta is a value between 0 and 1 describing the percentage of objects which changed cluster membership */ | |
| 34 volatile int finished; | |
| 35 | |
| 36 pthread_barrier_t barr; | |
| 37 pthread_mutex_t lock1; | |
| 38 pthread_attr_t attr; | |
| 39 | |
| 40 /* | |
| 41 * Struct: input | |
| 42 * ------------- | |
| 43 * Encapsulates all the input data for the benchmark, i.e. the object list, | |
| 44 * clustering output and the input statistics. | |
| 45 */ | |
| 46 struct input { | |
| 47 int t; | |
| 48 double **objects; /* Object list */ | |
| 49 double **clusters; /* Cluster list, out: [numClusters][numCoords] */ | |
| 50 int *membership; /* For each object, contains the index of the cluster it currently belongs to */ | |
| 51 int **local_newClusterSize; /* Thread-safe, [nthreads][numClusters] */ | |
| 52 double ***local_newClusters; /* Thread-safe, [nthreads][numClusters][numCoords] */ | |
| 53 int numObjs,numClusters,numCoords; | |
| 54 }; | |
| 55 | |
| 56 /* | |
| 57 * Function: euclid_dist_2 | |
| 58 * ----------------------- | |
| 59 * Computes the square of the euclidean distance between two multi-dimensional points. | |
| 60 */ | |
| 61 __inline static double euclid_dist_2(int numdims, double *coord1, double *coord2) { | |
| 62 int i; | |
| 63 double ans=0.0; | |
| 64 | |
| 65 for (i=0; i<numdims; i++) | |
| 66 ans += (coord1[i]-coord2[i]) * (coord1[i]-coord2[i]); | |
| 67 | |
| 68 return(ans); | |
| 69 } | |
| 70 | |
| 71 /* | |
| 72 * Function: find_nearest_cluster | |
| 73 * ------------------------------ | |
| 74 * Function determining the cluster center which is closest to the given object. | |
| 75 * Returns the index of that cluster center. | |
| 76 */ | |
| 77 __inline static int find_nearest_cluster(int numClusters, int numCoords, double *object, double **clusters) { | |
| 78 int index, i; | |
| 79 double dist, min_dist; | |
| 80 | |
| 81 /* find the cluster id that has min distance to object */ | |
| 82 index = 0; | |
| 83 min_dist = euclid_dist_2(numCoords, object, clusters[0]); | |
| 84 for (i=1; i<numClusters; i++) { | |
| 85 dist = euclid_dist_2(numCoords, object, clusters[i]); | |
| 86 | |
| 87 /* no need square root */ | |
| 88 if (dist < min_dist) { /* find the min and its array index */ | |
| 89 min_dist = dist; | |
| 90 index = i; | |
| 91 } | |
| 92 } | |
| 93 return index; | |
| 94 } | |
| 95 | |
| 96 void work(struct input *x){ | |
| 97 int tid = x->t; | |
| 98 double local_delta=0; | |
| 99 int i; | |
| 100 for (i = tid; i < x->numObjs; i += nthreads) { | |
| 101 /* find the array index of nearest cluster center */ | |
| 102 int index = find_nearest_cluster(x->numClusters, x->numCoords, | |
| 103 x->objects[i], x->clusters); | |
| 104 | |
| 105 /* if membership changes, increase delta by 1 */ | |
| 106 if (x->membership[i] != index) | |
| 107 local_delta += 1.0; | |
| 108 /* assign the membership to object i */ | |
| 109 x->membership[i] = index; | |
| 110 | |
| 111 /* update new cluster centers : sum of all objects located | |
| 112 within (average will be performed later) */ | |
| 113 x->local_newClusterSize[tid][index]++; | |
| 114 int j; | |
| 115 for (j=0; j < x->numCoords; j++) | |
| 116 x->local_newClusters[tid][index][j] += x->objects[i][j]; | |
| 117 | |
| 118 } | |
| 119 pthread_mutex_lock(&lock1); | |
| 120 delta +=local_delta; | |
| 121 pthread_mutex_unlock(&lock1); | |
| 122 } | |
| 123 /* | |
| 124 * Function: thread function work | |
| 125 * -------------- | |
| 126 * Worker function for threading. Work distribution is done so that each thread computers | |
| 127 */ | |
| 128 void* tfwork(void *ip) | |
| 129 { | |
| 130 struct input *x; | |
| 131 x = (struct input *)ip; | |
| 132 | |
| 133 for(;;){ | |
| 134 pthread_barrier_wait(&barr); | |
| 135 if (finished){ | |
| 136 break; | |
| 137 } | |
| 138 work(x); | |
| 139 pthread_barrier_wait(&barr); | |
| 140 } | |
| 141 | |
| 142 pthread_exit(NULL); | |
| 143 } | |
| 144 | |
| 145 /* | |
| 146 * Function: create_array_2d_f | |
| 147 * -------------------------- | |
| 148 * Allocates memory for a 2-dim double array as needed for the algorithm. | |
| 149 */ | |
| 150 double** create_array_2d_f(int height, int width) { | |
| 151 double** ptr; | |
| 152 int i; | |
| 153 ptr = calloc(height, sizeof(double*)); | |
| 154 assert(ptr != NULL); | |
| 155 ptr[0] = calloc(width * height, sizeof(double)); | |
| 156 assert(ptr[0] != NULL); | |
| 157 /* Assign pointers correctly */ | |
| 158 for(i = 1; i < height; i++) | |
| 159 ptr[i] = ptr[i-1] + width; | |
| 160 return ptr; | |
| 161 } | |
| 162 | |
| 163 /* | |
| 164 * Function: create_array_2Dd_i | |
| 165 * -------------------------- | |
| 166 * Allocates memory for a 2-dim integer array as needed for the algorithm. | |
| 167 */ | |
| 168 int** create_array_2d_i(int height, int width) { | |
| 169 int** ptr; | |
| 170 int i; | |
| 171 ptr = calloc(height, sizeof(int*)); | |
| 172 assert(ptr != NULL); | |
| 173 ptr[0] = calloc(width * height, sizeof(int)); | |
| 174 assert(ptr[0] != NULL); | |
| 175 /* Assign pointers correctly */ | |
| 176 for(i = 1; i < height; i++) | |
| 177 ptr[i] = ptr[i-1] + width; | |
| 178 return ptr; | |
| 179 } | |
| 180 | |
| 181 /* | |
| 182 * Function: pthreads_kmeans | |
| 183 * ------------------------- | |
| 184 * Algorithm main function. Returns a 2D array of cluster centers of size [numClusters][numCoords]. | |
| 185 */ | |
| 186 double** pthreads_kmeans(int is_perform_atomic, /* in: */ | |
| 187 double **objects, /* in: [numObjs][numCoords] */ | |
| 188 int numCoords, /* no. coordinates */ | |
| 189 int numObjs, /* no. objects */ | |
| 190 int numClusters, /* no. clusters */ | |
| 191 double threshold, /* % objects change membership */ | |
| 192 int *membership) /* out: [numObjs] */ | |
| 193 { | |
| 194 | |
| 195 int i, j, k, index, loop = 0, rc; | |
| 196 int *newClusterSize; /* [numClusters]: no. objects assigned in each | |
| 197 new cluster */ | |
| 198 double **clusters; /* out: [numClusters][numCoords] */ | |
| 199 double **newClusters; /* [numClusters][numCoords] */ | |
| 200 double timing; | |
| 201 int **local_newClusterSize; /* [nthreads][numClusters] */ | |
| 202 double ***local_newClusters; /* [nthreads][numClusters][numCoords] */ | |
| 203 | |
| 204 pthread_t *thread; | |
| 205 | |
| 206 /* === MEMORY SETUP === */ | |
| 207 | |
| 208 /* [numClusters] clusters of [numCoords] double coordinates each */ | |
| 209 clusters = create_array_2d_f(numClusters, numCoords); | |
| 210 | |
| 211 /* Pick first numClusters elements of objects[] as initial cluster centers */ | |
| 212 for (i=0; i < numClusters; i++) | |
| 213 for (j=0; j < numCoords; j++) | |
| 214 clusters[i][j] = objects[i][j]; | |
| 215 | |
| 216 /* Initialize membership, no object belongs to any cluster yet */ | |
| 217 for (i = 0; i < numObjs; i++) | |
| 218 membership[i] = -1; | |
| 219 | |
| 220 /* newClusterSize holds information on the count of members in each cluster */ | |
| 221 newClusterSize = (int*)calloc(numClusters, sizeof(int)); | |
| 222 assert(newClusterSize != NULL); | |
| 223 | |
| 224 /* newClusters holds the coordinates of the freshly created clusters */ | |
| 225 newClusters = create_array_2d_f(numClusters, numCoords); | |
| 226 local_newClusterSize = create_array_2d_i(nthreads, numClusters); | |
| 227 | |
| 228 /* local_newClusters is a 3D array */ | |
| 229 local_newClusters = (double***)malloc(nthreads * sizeof(double**)); | |
| 230 assert(local_newClusters != NULL); | |
| 231 local_newClusters[0] = (double**) malloc(nthreads * numClusters * sizeof(double*)); | |
| 232 assert(local_newClusters[0] != NULL); | |
| 233 | |
| 234 /* Set up the pointers */ | |
| 235 for (i = 1; i < nthreads; i++) | |
| 236 local_newClusters[i] = local_newClusters[i-1] + numClusters; | |
| 237 | |
| 238 for (i = 0; i < nthreads; i++) { | |
| 239 for (j = 0; j < numClusters; j++) { | |
| 240 local_newClusters[i][j] = (double*)calloc(numCoords, sizeof(double)); | |
| 241 assert(local_newClusters[i][j] != NULL); | |
| 242 } | |
| 243 } | |
| 244 /* Perform thread setup */ | |
| 245 thread = (pthread_t*)calloc(nthreads, sizeof(pthread_t)); | |
| 246 | |
| 247 printf("nthreads %d\n", nthreads); | |
| 248 pthread_barrier_init(&barr, NULL, nthreads); | |
| 249 pthread_attr_init(&attr); | |
| 250 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); | |
| 251 pthread_mutex_init(&lock1, NULL); | |
| 252 finished=0; | |
| 253 | |
| 254 struct input *ip = malloc(nthreads * sizeof(struct input)); | |
| 255 /* Provide thread-safe memory locations for each worker */ | |
| 256 for(i = 0; i < nthreads; i++){ | |
| 257 ip[i].t = i; | |
| 258 ip[i].objects=objects; | |
| 259 ip[i].clusters=clusters; | |
| 260 ip[i].membership=membership; | |
| 261 ip[i].local_newClusterSize=local_newClusterSize; | |
| 262 ip[i].local_newClusters=local_newClusters; | |
| 263 ip[i].numObjs=numObjs; | |
| 264 ip[i].numClusters=numClusters; | |
| 265 ip[i].numCoords=numCoords; | |
| 266 | |
| 267 if (i>0){ | |
| 268 rc = pthread_create(&thread[i], &attr, tfwork, (void *)&ip[i]); | |
| 269 if (rc) { | |
| 270 fprintf(stderr, "ERROR: Return Code For Thread Creation Is %d\n", rc); | |
| 271 exit(EXIT_FAILURE); | |
| 272 } | |
| 273 } | |
| 274 } | |
| 275 | |
| 276 /* === COMPUTATIONAL PHASE === */ | |
| 277 | |
| 278 do { | |
| 279 delta = 0.0; | |
| 280 pthread_barrier_wait(&barr); | |
| 281 work(&ip[0]); | |
| 282 | |
| 283 pthread_barrier_wait(&barr); | |
| 284 /* Let the main thread perform the array reduction */ | |
| 285 for (i = 0; i < numClusters; i++) { | |
| 286 for (j = 0; j < nthreads; j++) { | |
| 287 newClusterSize[i] += local_newClusterSize[j][i]; | |
| 288 local_newClusterSize[j][i] = 0.0; | |
| 289 for (k = 0; k < numCoords; k++) { | |
| 290 newClusters[i][k] += local_newClusters[j][i][k]; | |
| 291 local_newClusters[j][i][k] = 0.0; | |
| 292 } | |
| 293 } | |
| 294 } | |
| 295 | |
| 296 /* Average the sum and replace old cluster centers with newClusters */ | |
| 297 for (i = 0; i < numClusters; i++) { | |
| 298 for (j = 0; j < numCoords; j++) { | |
| 299 if (newClusterSize[i] > 1) | |
| 300 clusters[i][j] = newClusters[i][j] / newClusterSize[i]; | |
| 301 newClusters[i][j] = 0.0; /* set back to 0 */ | |
| 302 } | |
| 303 newClusterSize[i] = 0; /* set back to 0 */ | |
| 304 } | |
| 305 | |
| 306 delta /= numObjs; | |
| 307 } while (loop++ < PREC && delta > threshold); | |
| 308 | |
| 309 // Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program, | |
| 310 // but minor double precision floating point inaccuracies caused by threading would otherwise lead to huge differences in computed | |
| 311 // iterations, therefore making benchmarking completely unreliable. | |
| 312 | |
| 313 finished=1; | |
| 314 pthread_barrier_wait(&barr); | |
| 315 | |
| 316 for(i = 1; i < nthreads; i++) { | |
| 317 rc = pthread_join(thread[i], NULL); | |
| 318 if (rc) { | |
| 319 fprintf(stderr, "ERROR: Return Code For Thread Join Is %d\n", rc); | |
| 320 exit(EXIT_FAILURE); | |
| 321 } | |
| 322 } | |
| 323 | |
| 324 free(ip); | |
| 325 free(thread); | |
| 326 pthread_barrier_destroy(&barr); | |
| 327 pthread_mutex_destroy(&lock1); | |
| 328 pthread_attr_destroy(&attr); | |
| 329 | |
| 330 free(local_newClusterSize[0]); | |
| 331 free(local_newClusterSize); | |
| 332 | |
| 333 for (i = 0; i < nthreads; i++) | |
| 334 for (j = 0; j < numClusters; j++) | |
| 335 free(local_newClusters[i][j]); | |
| 336 free(local_newClusters[0]); | |
| 337 free(local_newClusters); | |
| 338 | |
| 339 free(newClusters[0]); | |
| 340 free(newClusters); | |
| 341 free(newClusterSize); | |
| 342 return clusters; | |
| 343 } | |
| 344 |
