annotate pthreads_kmeans.c @ 2:467746c73fd0

DataSet print
author Merten Sach <msach@mailbox.tu-berlin.de>
date Wed, 28 Sep 2011 15:07:32 +0200
parents e69e4c2d612a
children
rev   line source
msach@0 1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
msach@0 2 /* File: pthreads_kmeans.c (OpenMP version) */
msach@0 3 /* Description: Implementation of simple k-means clustering algorithm */
msach@0 4 /* This program takes an array of N data objects, each with */
msach@0 5 /* M coordinates and performs a k-means clustering given a */
msach@0 6 /* user-provided value of the number of clusters (K). The */
msach@0 7 /* clustering results are saved in 2 arrays: */
msach@0 8 /* 1. a returned array of size [K][N] indicating the center */
msach@0 9 /* coordinates of K clusters */
msach@0 10 /* 2. membership[N] stores the cluster center ids, each */
msach@0 11 /* corresponding to the cluster a data object is assigned */
msach@0 12 /* */
msach@0 13 /* Author: Wei-keng Liao */
msach@0 14 /* ECE Department, Northwestern University */
msach@0 15 /* email: wkliao@ece.northwestern.edu */
msach@0 16 /* Copyright, 2005, Wei-keng Liao */
msach@0 17 /* */
msach@0 18 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
msach@0 19
msach@0 20 #include <stdio.h>
msach@0 21 #include <stdlib.h>
msach@0 22 #include <pthread.h>
msach@0 23 #include <time.h>
msach@0 24 #include <math.h>
msach@0 25 #include "kmeans.h"
msach@0 26
msach@1 27 #include "VPThread_lib/VPThread.h"
msach@1 28
msach@0 29 #define PREC 300
msach@0 30
msach@1 31 struct barrier_t
msach@1 32 {
msach@1 33 int counter;
msach@1 34 int nthreads;
msach@1 35 int32 mutex;
msach@1 36 int32 cond;
msach@1 37 };
msach@1 38 typedef struct barrier_t barrier;
msach@0 39
msach@0 40 extern int nthreads; /* Thread count */
msach@0 41 double delta; /* Delta is a value between 0 and 1 describing the percentage of objects which changed cluster membership */
msach@0 42 volatile int finished;
msach@0 43
msach@1 44 barrier barr;
msach@1 45 int32 lock1;
msach@0 46 pthread_attr_t attr;
msach@0 47
msach@1 48 void inline barrier_init(barrier *barr, int nthreads, VirtProcr *VProc)
msach@1 49 {
msach@1 50 barr->counter = 0;
msach@1 51 barr->nthreads = nthreads;
msach@1 52 barr->mutex = VPThread__make_mutex(VProc);
msach@1 53 barr->cond = VPThread__make_cond(barr->mutex, VProc);
msach@1 54 }
msach@1 55
msach@1 56 void inline barrier_wait(barrier *barr, VirtProcr *VProc)
msach@1 57 {
msach@1 58 int i;
msach@1 59
msach@1 60 VPThread__mutex_lock(barr->mutex, VProc);
msach@1 61 barr->counter++;
msach@1 62 if(barr->counter == barr->nthreads)
msach@1 63 {
msach@1 64 barr->counter = 0;
msach@1 65 for(i=0; i < barr->nthreads; i++)
msach@1 66 VPThread__cond_signal(barr->cond, VProc);
msach@1 67 }
msach@1 68 else
msach@1 69 {
msach@1 70 VPThread__cond_wait(barr->cond, VProc);
msach@1 71 }
msach@1 72 VPThread__mutex_unlock(barr->mutex, VProc);
msach@1 73 }
msach@1 74
msach@0 75 /*
msach@0 76 * Struct: input
msach@0 77 * -------------
msach@0 78 * Encapsulates all the input data for the benchmark, i.e. the object list,
msach@0 79 * clustering output and the input statistics.
msach@0 80 */
msach@0 81 struct input {
msach@0 82 int t;
msach@0 83 double **objects; /* Object list */
msach@0 84 double **clusters; /* Cluster list, out: [numClusters][numCoords] */
msach@0 85 int *membership; /* For each object, contains the index of the cluster it currently belongs to */
msach@0 86 int **local_newClusterSize; /* Thread-safe, [nthreads][numClusters] */
msach@0 87 double ***local_newClusters; /* Thread-safe, [nthreads][numClusters][numCoords] */
msach@0 88 int numObjs,numClusters,numCoords;
msach@0 89 };
msach@0 90
msach@0 91 /*
msach@0 92 * Function: euclid_dist_2
msach@0 93 * -----------------------
msach@0 94 * Computes the square of the euclidean distance between two multi-dimensional points.
msach@0 95 */
msach@0 96 __inline static double euclid_dist_2(int numdims, double *coord1, double *coord2) {
msach@0 97 int i;
msach@0 98 double ans=0.0;
msach@0 99
msach@0 100 for (i=0; i<numdims; i++)
msach@0 101 ans += (coord1[i]-coord2[i]) * (coord1[i]-coord2[i]);
msach@0 102
msach@0 103 return(ans);
msach@0 104 }
msach@0 105
msach@0 106 /*
msach@0 107 * Function: find_nearest_cluster
msach@0 108 * ------------------------------
msach@0 109 * Function determining the cluster center which is closest to the given object.
msach@0 110 * Returns the index of that cluster center.
msach@0 111 */
msach@0 112 __inline static int find_nearest_cluster(int numClusters, int numCoords, double *object, double **clusters) {
msach@0 113 int index, i;
msach@0 114 double dist, min_dist;
msach@0 115
msach@0 116 /* find the cluster id that has min distance to object */
msach@0 117 index = 0;
msach@0 118 min_dist = euclid_dist_2(numCoords, object, clusters[0]);
msach@0 119 for (i=1; i<numClusters; i++) {
msach@0 120 dist = euclid_dist_2(numCoords, object, clusters[i]);
msach@0 121
msach@0 122 /* no need square root */
msach@0 123 if (dist < min_dist) { /* find the min and its array index */
msach@0 124 min_dist = dist;
msach@0 125 index = i;
msach@0 126 }
msach@0 127 }
msach@0 128 return index;
msach@0 129 }
msach@0 130
msach@1 131 void work(struct input *x, VirtProcr *VProc){
msach@0 132 int tid = x->t;
msach@0 133 double local_delta=0;
msach@0 134 int i;
msach@0 135 for (i = tid; i < x->numObjs; i += nthreads) {
msach@0 136 /* find the array index of nearest cluster center */
msach@0 137 int index = find_nearest_cluster(x->numClusters, x->numCoords,
msach@0 138 x->objects[i], x->clusters);
msach@0 139
msach@0 140 /* if membership changes, increase delta by 1 */
msach@0 141 if (x->membership[i] != index)
msach@0 142 local_delta += 1.0;
msach@0 143 /* assign the membership to object i */
msach@0 144 x->membership[i] = index;
msach@0 145
msach@0 146 /* update new cluster centers : sum of all objects located
msach@0 147 within (average will be performed later) */
msach@0 148 x->local_newClusterSize[tid][index]++;
msach@0 149 int j;
msach@0 150 for (j=0; j < x->numCoords; j++)
msach@0 151 x->local_newClusters[tid][index][j] += x->objects[i][j];
msach@0 152
msach@0 153 }
msach@1 154 VPThread__mutex_lock(lock1, VProc);
msach@0 155 delta +=local_delta;
msach@1 156 VPThread__mutex_unlock(lock1, VProc);
msach@0 157 }
msach@1 158
msach@0 159 /*
msach@0 160 * Function: thread function work
msach@0 161 * --------------
msach@0 162 * Worker function for threading. Work distribution is done so that each thread computers
msach@0 163 */
msach@1 164 void tfwork(void *ip, VirtProcr *VProc)
msach@0 165 {
msach@0 166 struct input *x;
msach@0 167 x = (struct input *)ip;
msach@0 168
msach@0 169 for(;;){
msach@1 170 barrier_wait(&barr, VProc);
msach@0 171 if (finished){
msach@0 172 break;
msach@0 173 }
msach@1 174 work(x, VProc);
msach@1 175 barrier_wait(&barr, VProc);
msach@0 176 }
msach@0 177
msach@1 178 //pthread_exit(NULL);
msach@1 179 VPThread__dissipate_thread(VProc);
msach@0 180 }
msach@0 181
msach@0 182 /*
msach@0 183 * Function: create_array_2d_f
msach@0 184 * --------------------------
msach@0 185 * Allocates memory for a 2-dim double array as needed for the algorithm.
msach@0 186 */
msach@1 187 double** create_array_2d_f(int height, int width, VirtProcr *VProc) {
msach@0 188 double** ptr;
msach@0 189 int i;
msach@1 190 ptr = VPThread__malloc(height * sizeof(double*), VProc);
msach@0 191 assert(ptr != NULL);
msach@1 192 ptr[0] = VPThread__malloc(width * height * sizeof(double), VProc);
msach@0 193 assert(ptr[0] != NULL);
msach@0 194 /* Assign pointers correctly */
msach@0 195 for(i = 1; i < height; i++)
msach@0 196 ptr[i] = ptr[i-1] + width;
msach@0 197 return ptr;
msach@0 198 }
msach@0 199
msach@0 200 /*
msach@0 201 * Function: create_array_2Dd_i
msach@0 202 * --------------------------
msach@0 203 * Allocates memory for a 2-dim integer array as needed for the algorithm.
msach@0 204 */
msach@1 205 int** create_array_2d_i(int height, int width, VirtProcr *VProc) {
msach@0 206 int** ptr;
msach@0 207 int i;
msach@1 208 ptr = VPThread__malloc(height * sizeof(int*), VProc);
msach@0 209 assert(ptr != NULL);
msach@1 210 ptr[0] = VPThread__malloc(width * height * sizeof(int), VProc);
msach@0 211 assert(ptr[0] != NULL);
msach@0 212 /* Assign pointers correctly */
msach@0 213 for(i = 1; i < height; i++)
msach@0 214 ptr[i] = ptr[i-1] + width;
msach@0 215 return ptr;
msach@0 216 }
msach@0 217
msach@0 218 /*
msach@0 219 * Function: pthreads_kmeans
msach@0 220 * -------------------------
msach@0 221 * Algorithm main function. Returns a 2D array of cluster centers of size [numClusters][numCoords].
msach@0 222 */
msach@1 223 void pthreads_kmeans(void *data, VirtProcr *VProc)
msach@0 224 {
msach@1 225 struct call_data *cluster_data = (struct call_data*)data;
msach@1 226 //int is_perform_atomic = cluster_data->is_perform_atomic; /* in: */
msach@1 227 double **objects = cluster_data->objects; /* in: [numObjs][numCoords] */
msach@1 228 int numCoords = cluster_data->numCoords; /* no. coordinates */
msach@1 229 int numObjs = cluster_data->numObjs; /* no. objects */
msach@1 230 int numClusters = cluster_data->numClusters; /* no. clusters */
msach@1 231 double threshold = cluster_data->threshold; /* % objects change membership */
msach@1 232 int *membership = cluster_data->membership; /* out: [numObjs] */
msach@0 233
msach@1 234 int i, j, k, loop = 0;
msach@0 235 int *newClusterSize; /* [numClusters]: no. objects assigned in each
msach@0 236 new cluster */
msach@1 237 double **clusters = cluster_data->clusters; /* out: [numClusters][numCoords] */
msach@0 238 double **newClusters; /* [numClusters][numCoords] */
msach@1 239 //double timing;
msach@0 240 int **local_newClusterSize; /* [nthreads][numClusters] */
msach@0 241 double ***local_newClusters; /* [nthreads][numClusters][numCoords] */
msach@0 242
msach@1 243 VirtProcr **thread;
msach@0 244
msach@0 245 /* === MEMORY SETUP === */
msach@0 246
msach@0 247 /* [numClusters] clusters of [numCoords] double coordinates each */
msach@1 248 //Set pointers
msach@1 249 for(i = 1; i < numClusters; i++)
msach@1 250 clusters[i] = clusters[i-1] + numCoords;
msach@0 251
msach@0 252 /* Pick first numClusters elements of objects[] as initial cluster centers */
msach@0 253 for (i=0; i < numClusters; i++)
msach@0 254 for (j=0; j < numCoords; j++)
msach@0 255 clusters[i][j] = objects[i][j];
msach@0 256
msach@0 257 /* Initialize membership, no object belongs to any cluster yet */
msach@0 258 for (i = 0; i < numObjs; i++)
msach@0 259 membership[i] = -1;
msach@0 260
msach@0 261 /* newClusterSize holds information on the count of members in each cluster */
msach@1 262 newClusterSize = (int*)VPThread__malloc(numClusters * sizeof(int), VProc);
msach@0 263 assert(newClusterSize != NULL);
msach@0 264
msach@0 265 /* newClusters holds the coordinates of the freshly created clusters */
msach@1 266 newClusters = create_array_2d_f(numClusters, numCoords, VProc);
msach@1 267 local_newClusterSize = create_array_2d_i(nthreads, numClusters, VProc);
msach@0 268
msach@0 269 /* local_newClusters is a 3D array */
msach@1 270 local_newClusters = (double***)VPThread__malloc(nthreads * sizeof(double**), VProc);
msach@0 271 assert(local_newClusters != NULL);
msach@1 272 local_newClusters[0] = (double**) VPThread__malloc(nthreads * numClusters * sizeof(double*), VProc);
msach@0 273 assert(local_newClusters[0] != NULL);
msach@0 274
msach@0 275 /* Set up the pointers */
msach@0 276 for (i = 1; i < nthreads; i++)
msach@0 277 local_newClusters[i] = local_newClusters[i-1] + numClusters;
msach@0 278
msach@0 279 for (i = 0; i < nthreads; i++) {
msach@0 280 for (j = 0; j < numClusters; j++) {
msach@1 281 local_newClusters[i][j] = (double*)VPThread__malloc(numCoords * sizeof(double), VProc);
msach@0 282 assert(local_newClusters[i][j] != NULL);
msach@0 283 }
msach@0 284 }
msach@0 285 /* Perform thread setup */
msach@1 286 thread = (VirtProcr**)VPThread__malloc(nthreads * sizeof(VirtProcr*), VProc);
msach@0 287
msach@1 288 barrier_init(&barr, nthreads, VProc);
msach@1 289 lock1 = VPThread__make_mutex(VProc);
msach@0 290 finished=0;
msach@0 291
msach@1 292 struct input *ip = VPThread__malloc(nthreads * sizeof(struct input), VProc);
msach@0 293 /* Provide thread-safe memory locations for each worker */
msach@0 294 for(i = 0; i < nthreads; i++){
msach@0 295 ip[i].t = i;
msach@0 296 ip[i].objects=objects;
msach@0 297 ip[i].clusters=clusters;
msach@0 298 ip[i].membership=membership;
msach@0 299 ip[i].local_newClusterSize=local_newClusterSize;
msach@0 300 ip[i].local_newClusters=local_newClusters;
msach@0 301 ip[i].numObjs=numObjs;
msach@0 302 ip[i].numClusters=numClusters;
msach@0 303 ip[i].numCoords=numCoords;
msach@0 304
msach@0 305 if (i>0){
msach@1 306 thread[i] = VPThread__create_thread(tfwork, (void*)&ip[i], VProc);
msach@0 307 }
msach@0 308 }
msach@0 309
msach@0 310 /* === COMPUTATIONAL PHASE === */
msach@0 311
msach@0 312 do {
msach@0 313 delta = 0.0;
msach@1 314 barrier_wait(&barr, VProc);
msach@1 315 work(&ip[0], VProc);
msach@0 316
msach@1 317 barrier_wait(&barr, VProc);
msach@0 318 /* Let the main thread perform the array reduction */
msach@0 319 for (i = 0; i < numClusters; i++) {
msach@0 320 for (j = 0; j < nthreads; j++) {
msach@0 321 newClusterSize[i] += local_newClusterSize[j][i];
msach@0 322 local_newClusterSize[j][i] = 0.0;
msach@0 323 for (k = 0; k < numCoords; k++) {
msach@0 324 newClusters[i][k] += local_newClusters[j][i][k];
msach@0 325 local_newClusters[j][i][k] = 0.0;
msach@0 326 }
msach@0 327 }
msach@0 328 }
msach@0 329
msach@0 330 /* Average the sum and replace old cluster centers with newClusters */
msach@0 331 for (i = 0; i < numClusters; i++) {
msach@0 332 for (j = 0; j < numCoords; j++) {
msach@0 333 if (newClusterSize[i] > 1)
msach@0 334 clusters[i][j] = newClusters[i][j] / newClusterSize[i];
msach@0 335 newClusters[i][j] = 0.0; /* set back to 0 */
msach@0 336 }
msach@0 337 newClusterSize[i] = 0; /* set back to 0 */
msach@0 338 }
msach@0 339
msach@0 340 delta /= numObjs;
msach@0 341 } while (loop++ < PREC && delta > threshold);
msach@0 342
msach@1 343 // Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program,
msach@0 344 // but minor double precision floating point inaccuracies caused by threading would otherwise lead to huge differences in computed
msach@0 345 // iterations, therefore making benchmarking completely unreliable.
msach@0 346
msach@1 347 finished=1;
msach@1 348 barrier_wait(&barr, VProc);
msach@0 349
msach@0 350
msach@1 351 VPThread__free(ip, VProc);
msach@1 352 VPThread__free(thread, VProc);
msach@1 353
msach@1 354 VPThread__free(local_newClusterSize[0], VProc);
msach@1 355 VPThread__free(local_newClusterSize, VProc);
msach@0 356
msach@0 357 for (i = 0; i < nthreads; i++)
msach@0 358 for (j = 0; j < numClusters; j++)
msach@1 359 VPThread__free(local_newClusters[i][j], VProc);
msach@1 360 VPThread__free(local_newClusters[0], VProc);
msach@1 361 VPThread__free(local_newClusters, VProc);
msach@0 362
msach@1 363 VPThread__free(newClusters[0], VProc);
msach@1 364 VPThread__free(newClusters, VProc);
msach@1 365 VPThread__free(newClusterSize, VProc);
msach@1 366
msach@1 367 (cluster_data)->clusters = clusters;
msach@1 368
msach@1 369 VPThread__dissipate_thread(VProc);
msach@0 370 }
msach@0 371