annotate pthreads_kmeans.c @ 0:e69e4c2d612a

Initial pthreads version
author Merten Sach <msach@mailbox.tu-berlin.de>
date Wed, 03 Aug 2011 19:30:34 +0200
parents
children 8e7bdab2840f
rev   line source
msach@0 1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
msach@0 2 /* File: pthreads_kmeans.c (OpenMP version) */
msach@0 3 /* Description: Implementation of simple k-means clustering algorithm */
msach@0 4 /* This program takes an array of N data objects, each with */
msach@0 5 /* M coordinates and performs a k-means clustering given a */
msach@0 6 /* user-provided value of the number of clusters (K). The */
msach@0 7 /* clustering results are saved in 2 arrays: */
msach@0 8 /* 1. a returned array of size [K][N] indicating the center */
msach@0 9 /* coordinates of K clusters */
msach@0 10 /* 2. membership[N] stores the cluster center ids, each */
msach@0 11 /* corresponding to the cluster a data object is assigned */
msach@0 12 /* */
msach@0 13 /* Author: Wei-keng Liao */
msach@0 14 /* ECE Department, Northwestern University */
msach@0 15 /* email: wkliao@ece.northwestern.edu */
msach@0 16 /* Copyright, 2005, Wei-keng Liao */
msach@0 17 /* */
msach@0 18 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
msach@0 19
msach@0 20 #include <stdio.h>
msach@0 21 #include <stdlib.h>
msach@0 22 #include <pthread.h>
msach@0 23 #include <time.h>
msach@0 24 #include <math.h>
msach@0 25 #include "kmeans.h"
msach@0 26
msach@0 27 #define PREC 300
msach@0 28
msach@0 29 char __ProgrammName[] = "kmeans";
msach@0 30 char __DataSet[255];
msach@0 31
msach@0 32 extern int nthreads; /* Thread count */
msach@0 33 double delta; /* Delta is a value between 0 and 1 describing the percentage of objects which changed cluster membership */
msach@0 34 volatile int finished;
msach@0 35
msach@0 36 pthread_barrier_t barr;
msach@0 37 pthread_mutex_t lock1;
msach@0 38 pthread_attr_t attr;
msach@0 39
msach@0 40 /*
msach@0 41 * Struct: input
msach@0 42 * -------------
msach@0 43 * Encapsulates all the input data for the benchmark, i.e. the object list,
msach@0 44 * clustering output and the input statistics.
msach@0 45 */
msach@0 46 struct input {
msach@0 47 int t;
msach@0 48 double **objects; /* Object list */
msach@0 49 double **clusters; /* Cluster list, out: [numClusters][numCoords] */
msach@0 50 int *membership; /* For each object, contains the index of the cluster it currently belongs to */
msach@0 51 int **local_newClusterSize; /* Thread-safe, [nthreads][numClusters] */
msach@0 52 double ***local_newClusters; /* Thread-safe, [nthreads][numClusters][numCoords] */
msach@0 53 int numObjs,numClusters,numCoords;
msach@0 54 };
msach@0 55
msach@0 56 /*
msach@0 57 * Function: euclid_dist_2
msach@0 58 * -----------------------
msach@0 59 * Computes the square of the euclidean distance between two multi-dimensional points.
msach@0 60 */
msach@0 61 __inline static double euclid_dist_2(int numdims, double *coord1, double *coord2) {
msach@0 62 int i;
msach@0 63 double ans=0.0;
msach@0 64
msach@0 65 for (i=0; i<numdims; i++)
msach@0 66 ans += (coord1[i]-coord2[i]) * (coord1[i]-coord2[i]);
msach@0 67
msach@0 68 return(ans);
msach@0 69 }
msach@0 70
msach@0 71 /*
msach@0 72 * Function: find_nearest_cluster
msach@0 73 * ------------------------------
msach@0 74 * Function determining the cluster center which is closest to the given object.
msach@0 75 * Returns the index of that cluster center.
msach@0 76 */
msach@0 77 __inline static int find_nearest_cluster(int numClusters, int numCoords, double *object, double **clusters) {
msach@0 78 int index, i;
msach@0 79 double dist, min_dist;
msach@0 80
msach@0 81 /* find the cluster id that has min distance to object */
msach@0 82 index = 0;
msach@0 83 min_dist = euclid_dist_2(numCoords, object, clusters[0]);
msach@0 84 for (i=1; i<numClusters; i++) {
msach@0 85 dist = euclid_dist_2(numCoords, object, clusters[i]);
msach@0 86
msach@0 87 /* no need square root */
msach@0 88 if (dist < min_dist) { /* find the min and its array index */
msach@0 89 min_dist = dist;
msach@0 90 index = i;
msach@0 91 }
msach@0 92 }
msach@0 93 return index;
msach@0 94 }
msach@0 95
msach@0 96 void work(struct input *x){
msach@0 97 int tid = x->t;
msach@0 98 double local_delta=0;
msach@0 99 int i;
msach@0 100 for (i = tid; i < x->numObjs; i += nthreads) {
msach@0 101 /* find the array index of nearest cluster center */
msach@0 102 int index = find_nearest_cluster(x->numClusters, x->numCoords,
msach@0 103 x->objects[i], x->clusters);
msach@0 104
msach@0 105 /* if membership changes, increase delta by 1 */
msach@0 106 if (x->membership[i] != index)
msach@0 107 local_delta += 1.0;
msach@0 108 /* assign the membership to object i */
msach@0 109 x->membership[i] = index;
msach@0 110
msach@0 111 /* update new cluster centers : sum of all objects located
msach@0 112 within (average will be performed later) */
msach@0 113 x->local_newClusterSize[tid][index]++;
msach@0 114 int j;
msach@0 115 for (j=0; j < x->numCoords; j++)
msach@0 116 x->local_newClusters[tid][index][j] += x->objects[i][j];
msach@0 117
msach@0 118 }
msach@0 119 pthread_mutex_lock(&lock1);
msach@0 120 delta +=local_delta;
msach@0 121 pthread_mutex_unlock(&lock1);
msach@0 122 }
msach@0 123 /*
msach@0 124 * Function: thread function work
msach@0 125 * --------------
msach@0 126 * Worker function for threading. Work distribution is done so that each thread computers
msach@0 127 */
msach@0 128 void* tfwork(void *ip)
msach@0 129 {
msach@0 130 struct input *x;
msach@0 131 x = (struct input *)ip;
msach@0 132
msach@0 133 for(;;){
msach@0 134 pthread_barrier_wait(&barr);
msach@0 135 if (finished){
msach@0 136 break;
msach@0 137 }
msach@0 138 work(x);
msach@0 139 pthread_barrier_wait(&barr);
msach@0 140 }
msach@0 141
msach@0 142 pthread_exit(NULL);
msach@0 143 }
msach@0 144
msach@0 145 /*
msach@0 146 * Function: create_array_2d_f
msach@0 147 * --------------------------
msach@0 148 * Allocates memory for a 2-dim double array as needed for the algorithm.
msach@0 149 */
msach@0 150 double** create_array_2d_f(int height, int width) {
msach@0 151 double** ptr;
msach@0 152 int i;
msach@0 153 ptr = calloc(height, sizeof(double*));
msach@0 154 assert(ptr != NULL);
msach@0 155 ptr[0] = calloc(width * height, sizeof(double));
msach@0 156 assert(ptr[0] != NULL);
msach@0 157 /* Assign pointers correctly */
msach@0 158 for(i = 1; i < height; i++)
msach@0 159 ptr[i] = ptr[i-1] + width;
msach@0 160 return ptr;
msach@0 161 }
msach@0 162
msach@0 163 /*
msach@0 164 * Function: create_array_2Dd_i
msach@0 165 * --------------------------
msach@0 166 * Allocates memory for a 2-dim integer array as needed for the algorithm.
msach@0 167 */
msach@0 168 int** create_array_2d_i(int height, int width) {
msach@0 169 int** ptr;
msach@0 170 int i;
msach@0 171 ptr = calloc(height, sizeof(int*));
msach@0 172 assert(ptr != NULL);
msach@0 173 ptr[0] = calloc(width * height, sizeof(int));
msach@0 174 assert(ptr[0] != NULL);
msach@0 175 /* Assign pointers correctly */
msach@0 176 for(i = 1; i < height; i++)
msach@0 177 ptr[i] = ptr[i-1] + width;
msach@0 178 return ptr;
msach@0 179 }
msach@0 180
msach@0 181 /*
msach@0 182 * Function: pthreads_kmeans
msach@0 183 * -------------------------
msach@0 184 * Algorithm main function. Returns a 2D array of cluster centers of size [numClusters][numCoords].
msach@0 185 */
msach@0 186 double** pthreads_kmeans(int is_perform_atomic, /* in: */
msach@0 187 double **objects, /* in: [numObjs][numCoords] */
msach@0 188 int numCoords, /* no. coordinates */
msach@0 189 int numObjs, /* no. objects */
msach@0 190 int numClusters, /* no. clusters */
msach@0 191 double threshold, /* % objects change membership */
msach@0 192 int *membership) /* out: [numObjs] */
msach@0 193 {
msach@0 194
msach@0 195 int i, j, k, index, loop = 0, rc;
msach@0 196 int *newClusterSize; /* [numClusters]: no. objects assigned in each
msach@0 197 new cluster */
msach@0 198 double **clusters; /* out: [numClusters][numCoords] */
msach@0 199 double **newClusters; /* [numClusters][numCoords] */
msach@0 200 double timing;
msach@0 201 int **local_newClusterSize; /* [nthreads][numClusters] */
msach@0 202 double ***local_newClusters; /* [nthreads][numClusters][numCoords] */
msach@0 203
msach@0 204 pthread_t *thread;
msach@0 205
msach@0 206 /* === MEMORY SETUP === */
msach@0 207
msach@0 208 /* [numClusters] clusters of [numCoords] double coordinates each */
msach@0 209 clusters = create_array_2d_f(numClusters, numCoords);
msach@0 210
msach@0 211 /* Pick first numClusters elements of objects[] as initial cluster centers */
msach@0 212 for (i=0; i < numClusters; i++)
msach@0 213 for (j=0; j < numCoords; j++)
msach@0 214 clusters[i][j] = objects[i][j];
msach@0 215
msach@0 216 /* Initialize membership, no object belongs to any cluster yet */
msach@0 217 for (i = 0; i < numObjs; i++)
msach@0 218 membership[i] = -1;
msach@0 219
msach@0 220 /* newClusterSize holds information on the count of members in each cluster */
msach@0 221 newClusterSize = (int*)calloc(numClusters, sizeof(int));
msach@0 222 assert(newClusterSize != NULL);
msach@0 223
msach@0 224 /* newClusters holds the coordinates of the freshly created clusters */
msach@0 225 newClusters = create_array_2d_f(numClusters, numCoords);
msach@0 226 local_newClusterSize = create_array_2d_i(nthreads, numClusters);
msach@0 227
msach@0 228 /* local_newClusters is a 3D array */
msach@0 229 local_newClusters = (double***)malloc(nthreads * sizeof(double**));
msach@0 230 assert(local_newClusters != NULL);
msach@0 231 local_newClusters[0] = (double**) malloc(nthreads * numClusters * sizeof(double*));
msach@0 232 assert(local_newClusters[0] != NULL);
msach@0 233
msach@0 234 /* Set up the pointers */
msach@0 235 for (i = 1; i < nthreads; i++)
msach@0 236 local_newClusters[i] = local_newClusters[i-1] + numClusters;
msach@0 237
msach@0 238 for (i = 0; i < nthreads; i++) {
msach@0 239 for (j = 0; j < numClusters; j++) {
msach@0 240 local_newClusters[i][j] = (double*)calloc(numCoords, sizeof(double));
msach@0 241 assert(local_newClusters[i][j] != NULL);
msach@0 242 }
msach@0 243 }
msach@0 244 /* Perform thread setup */
msach@0 245 thread = (pthread_t*)calloc(nthreads, sizeof(pthread_t));
msach@0 246
msach@0 247 printf("nthreads %d\n", nthreads);
msach@0 248 pthread_barrier_init(&barr, NULL, nthreads);
msach@0 249 pthread_attr_init(&attr);
msach@0 250 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
msach@0 251 pthread_mutex_init(&lock1, NULL);
msach@0 252 finished=0;
msach@0 253
msach@0 254 struct input *ip = malloc(nthreads * sizeof(struct input));
msach@0 255 /* Provide thread-safe memory locations for each worker */
msach@0 256 for(i = 0; i < nthreads; i++){
msach@0 257 ip[i].t = i;
msach@0 258 ip[i].objects=objects;
msach@0 259 ip[i].clusters=clusters;
msach@0 260 ip[i].membership=membership;
msach@0 261 ip[i].local_newClusterSize=local_newClusterSize;
msach@0 262 ip[i].local_newClusters=local_newClusters;
msach@0 263 ip[i].numObjs=numObjs;
msach@0 264 ip[i].numClusters=numClusters;
msach@0 265 ip[i].numCoords=numCoords;
msach@0 266
msach@0 267 if (i>0){
msach@0 268 rc = pthread_create(&thread[i], &attr, tfwork, (void *)&ip[i]);
msach@0 269 if (rc) {
msach@0 270 fprintf(stderr, "ERROR: Return Code For Thread Creation Is %d\n", rc);
msach@0 271 exit(EXIT_FAILURE);
msach@0 272 }
msach@0 273 }
msach@0 274 }
msach@0 275
msach@0 276 /* === COMPUTATIONAL PHASE === */
msach@0 277
msach@0 278 do {
msach@0 279 delta = 0.0;
msach@0 280 pthread_barrier_wait(&barr);
msach@0 281 work(&ip[0]);
msach@0 282
msach@0 283 pthread_barrier_wait(&barr);
msach@0 284 /* Let the main thread perform the array reduction */
msach@0 285 for (i = 0; i < numClusters; i++) {
msach@0 286 for (j = 0; j < nthreads; j++) {
msach@0 287 newClusterSize[i] += local_newClusterSize[j][i];
msach@0 288 local_newClusterSize[j][i] = 0.0;
msach@0 289 for (k = 0; k < numCoords; k++) {
msach@0 290 newClusters[i][k] += local_newClusters[j][i][k];
msach@0 291 local_newClusters[j][i][k] = 0.0;
msach@0 292 }
msach@0 293 }
msach@0 294 }
msach@0 295
msach@0 296 /* Average the sum and replace old cluster centers with newClusters */
msach@0 297 for (i = 0; i < numClusters; i++) {
msach@0 298 for (j = 0; j < numCoords; j++) {
msach@0 299 if (newClusterSize[i] > 1)
msach@0 300 clusters[i][j] = newClusters[i][j] / newClusterSize[i];
msach@0 301 newClusters[i][j] = 0.0; /* set back to 0 */
msach@0 302 }
msach@0 303 newClusterSize[i] = 0; /* set back to 0 */
msach@0 304 }
msach@0 305
msach@0 306 delta /= numObjs;
msach@0 307 } while (loop++ < PREC && delta > threshold);
msach@0 308
msach@0 309 // Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program,
msach@0 310 // but minor double precision floating point inaccuracies caused by threading would otherwise lead to huge differences in computed
msach@0 311 // iterations, therefore making benchmarking completely unreliable.
msach@0 312
msach@0 313 finished=1;
msach@0 314 pthread_barrier_wait(&barr);
msach@0 315
msach@0 316 for(i = 1; i < nthreads; i++) {
msach@0 317 rc = pthread_join(thread[i], NULL);
msach@0 318 if (rc) {
msach@0 319 fprintf(stderr, "ERROR: Return Code For Thread Join Is %d\n", rc);
msach@0 320 exit(EXIT_FAILURE);
msach@0 321 }
msach@0 322 }
msach@0 323
msach@0 324 free(ip);
msach@0 325 free(thread);
msach@0 326 pthread_barrier_destroy(&barr);
msach@0 327 pthread_mutex_destroy(&lock1);
msach@0 328 pthread_attr_destroy(&attr);
msach@0 329
msach@0 330 free(local_newClusterSize[0]);
msach@0 331 free(local_newClusterSize);
msach@0 332
msach@0 333 for (i = 0; i < nthreads; i++)
msach@0 334 for (j = 0; j < numClusters; j++)
msach@0 335 free(local_newClusters[i][j]);
msach@0 336 free(local_newClusters[0]);
msach@0 337 free(local_newClusters);
msach@0 338
msach@0 339 free(newClusters[0]);
msach@0 340 free(newClusters);
msach@0 341 free(newClusterSize);
msach@0 342 return clusters;
msach@0 343 }
msach@0 344