Mercurial > cgi-bin > hgwebdir.cgi > PR > Applications > Vthread > Vthread__KMeans__Bench
diff pthreads_kmeans.c @ 0:e69e4c2d612a
Initial pthreads version
| author | Merten Sach <msach@mailbox.tu-berlin.de> |
|---|---|
| date | Wed, 03 Aug 2011 19:30:34 +0200 |
| parents | |
| children | 8e7bdab2840f |
line diff
1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/pthreads_kmeans.c Wed Aug 03 19:30:34 2011 +0200 1.3 @@ -0,0 +1,344 @@ 1.4 +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 1.5 +/* File: pthreads_kmeans.c (OpenMP version) */ 1.6 +/* Description: Implementation of simple k-means clustering algorithm */ 1.7 +/* This program takes an array of N data objects, each with */ 1.8 +/* M coordinates and performs a k-means clustering given a */ 1.9 +/* user-provided value of the number of clusters (K). The */ 1.10 +/* clustering results are saved in 2 arrays: */ 1.11 +/* 1. a returned array of size [K][N] indicating the center */ 1.12 +/* coordinates of K clusters */ 1.13 +/* 2. membership[N] stores the cluster center ids, each */ 1.14 +/* corresponding to the cluster a data object is assigned */ 1.15 +/* */ 1.16 +/* Author: Wei-keng Liao */ 1.17 +/* ECE Department, Northwestern University */ 1.18 +/* email: wkliao@ece.northwestern.edu */ 1.19 +/* Copyright, 2005, Wei-keng Liao */ 1.20 +/* */ 1.21 +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 1.22 + 1.23 +#include <stdio.h> 1.24 +#include <stdlib.h> 1.25 +#include <pthread.h> 1.26 +#include <time.h> 1.27 +#include <math.h> 1.28 +#include "kmeans.h" 1.29 + 1.30 +#define PREC 300 1.31 + 1.32 +char __ProgrammName[] = "kmeans"; 1.33 +char __DataSet[255]; 1.34 + 1.35 +extern int nthreads; /* Thread count */ 1.36 +double delta; /* Delta is a value between 0 and 1 describing the percentage of objects which changed cluster membership */ 1.37 +volatile int finished; 1.38 + 1.39 +pthread_barrier_t barr; 1.40 +pthread_mutex_t lock1; 1.41 +pthread_attr_t attr; 1.42 + 1.43 +/* 1.44 +* Struct: input 1.45 +* ------------- 1.46 +* Encapsulates all the input data for the benchmark, i.e. the object list, 1.47 +* clustering output and the input statistics. 1.48 +*/ 1.49 +struct input { 1.50 + int t; 1.51 + double **objects; /* Object list */ 1.52 + double **clusters; /* Cluster list, out: [numClusters][numCoords] */ 1.53 + int *membership; /* For each object, contains the index of the cluster it currently belongs to */ 1.54 + int **local_newClusterSize; /* Thread-safe, [nthreads][numClusters] */ 1.55 + double ***local_newClusters; /* Thread-safe, [nthreads][numClusters][numCoords] */ 1.56 + int numObjs,numClusters,numCoords; 1.57 +}; 1.58 + 1.59 +/* 1.60 +* Function: euclid_dist_2 1.61 +* ----------------------- 1.62 +* Computes the square of the euclidean distance between two multi-dimensional points. 1.63 +*/ 1.64 +__inline static double euclid_dist_2(int numdims, double *coord1, double *coord2) { 1.65 + int i; 1.66 + double ans=0.0; 1.67 + 1.68 + for (i=0; i<numdims; i++) 1.69 + ans += (coord1[i]-coord2[i]) * (coord1[i]-coord2[i]); 1.70 + 1.71 + return(ans); 1.72 +} 1.73 + 1.74 +/* 1.75 +* Function: find_nearest_cluster 1.76 +* ------------------------------ 1.77 +* Function determining the cluster center which is closest to the given object. 1.78 +* Returns the index of that cluster center. 1.79 +*/ 1.80 +__inline static int find_nearest_cluster(int numClusters, int numCoords, double *object, double **clusters) { 1.81 + int index, i; 1.82 + double dist, min_dist; 1.83 + 1.84 + /* find the cluster id that has min distance to object */ 1.85 + index = 0; 1.86 + min_dist = euclid_dist_2(numCoords, object, clusters[0]); 1.87 + for (i=1; i<numClusters; i++) { 1.88 + dist = euclid_dist_2(numCoords, object, clusters[i]); 1.89 + 1.90 + /* no need square root */ 1.91 + if (dist < min_dist) { /* find the min and its array index */ 1.92 + min_dist = dist; 1.93 + index = i; 1.94 + } 1.95 + } 1.96 + return index; 1.97 +} 1.98 + 1.99 +void work(struct input *x){ 1.100 + int tid = x->t; 1.101 + double local_delta=0; 1.102 + int i; 1.103 + for (i = tid; i < x->numObjs; i += nthreads) { 1.104 + /* find the array index of nearest cluster center */ 1.105 + int index = find_nearest_cluster(x->numClusters, x->numCoords, 1.106 + x->objects[i], x->clusters); 1.107 + 1.108 + /* if membership changes, increase delta by 1 */ 1.109 + if (x->membership[i] != index) 1.110 + local_delta += 1.0; 1.111 + /* assign the membership to object i */ 1.112 + x->membership[i] = index; 1.113 + 1.114 + /* update new cluster centers : sum of all objects located 1.115 + within (average will be performed later) */ 1.116 + x->local_newClusterSize[tid][index]++; 1.117 + int j; 1.118 + for (j=0; j < x->numCoords; j++) 1.119 + x->local_newClusters[tid][index][j] += x->objects[i][j]; 1.120 + 1.121 + } 1.122 + pthread_mutex_lock(&lock1); 1.123 + delta +=local_delta; 1.124 + pthread_mutex_unlock(&lock1); 1.125 +} 1.126 +/* 1.127 +* Function: thread function work 1.128 +* -------------- 1.129 +* Worker function for threading. Work distribution is done so that each thread computers 1.130 +*/ 1.131 +void* tfwork(void *ip) 1.132 +{ 1.133 + struct input *x; 1.134 + x = (struct input *)ip; 1.135 + 1.136 + for(;;){ 1.137 + pthread_barrier_wait(&barr); 1.138 + if (finished){ 1.139 + break; 1.140 + } 1.141 + work(x); 1.142 + pthread_barrier_wait(&barr); 1.143 + } 1.144 + 1.145 + pthread_exit(NULL); 1.146 +} 1.147 + 1.148 +/* 1.149 +* Function: create_array_2d_f 1.150 +* -------------------------- 1.151 +* Allocates memory for a 2-dim double array as needed for the algorithm. 1.152 +*/ 1.153 +double** create_array_2d_f(int height, int width) { 1.154 + double** ptr; 1.155 + int i; 1.156 + ptr = calloc(height, sizeof(double*)); 1.157 + assert(ptr != NULL); 1.158 + ptr[0] = calloc(width * height, sizeof(double)); 1.159 + assert(ptr[0] != NULL); 1.160 + /* Assign pointers correctly */ 1.161 + for(i = 1; i < height; i++) 1.162 + ptr[i] = ptr[i-1] + width; 1.163 + return ptr; 1.164 +} 1.165 + 1.166 +/* 1.167 +* Function: create_array_2Dd_i 1.168 +* -------------------------- 1.169 +* Allocates memory for a 2-dim integer array as needed for the algorithm. 1.170 +*/ 1.171 +int** create_array_2d_i(int height, int width) { 1.172 + int** ptr; 1.173 + int i; 1.174 + ptr = calloc(height, sizeof(int*)); 1.175 + assert(ptr != NULL); 1.176 + ptr[0] = calloc(width * height, sizeof(int)); 1.177 + assert(ptr[0] != NULL); 1.178 + /* Assign pointers correctly */ 1.179 + for(i = 1; i < height; i++) 1.180 + ptr[i] = ptr[i-1] + width; 1.181 + return ptr; 1.182 +} 1.183 + 1.184 +/* 1.185 +* Function: pthreads_kmeans 1.186 +* ------------------------- 1.187 +* Algorithm main function. Returns a 2D array of cluster centers of size [numClusters][numCoords]. 1.188 +*/ 1.189 +double** pthreads_kmeans(int is_perform_atomic, /* in: */ 1.190 + double **objects, /* in: [numObjs][numCoords] */ 1.191 + int numCoords, /* no. coordinates */ 1.192 + int numObjs, /* no. objects */ 1.193 + int numClusters, /* no. clusters */ 1.194 + double threshold, /* % objects change membership */ 1.195 + int *membership) /* out: [numObjs] */ 1.196 +{ 1.197 + 1.198 + int i, j, k, index, loop = 0, rc; 1.199 + int *newClusterSize; /* [numClusters]: no. objects assigned in each 1.200 + new cluster */ 1.201 + double **clusters; /* out: [numClusters][numCoords] */ 1.202 + double **newClusters; /* [numClusters][numCoords] */ 1.203 + double timing; 1.204 + int **local_newClusterSize; /* [nthreads][numClusters] */ 1.205 + double ***local_newClusters; /* [nthreads][numClusters][numCoords] */ 1.206 + 1.207 + pthread_t *thread; 1.208 + 1.209 + /* === MEMORY SETUP === */ 1.210 + 1.211 + /* [numClusters] clusters of [numCoords] double coordinates each */ 1.212 + clusters = create_array_2d_f(numClusters, numCoords); 1.213 + 1.214 + /* Pick first numClusters elements of objects[] as initial cluster centers */ 1.215 + for (i=0; i < numClusters; i++) 1.216 + for (j=0; j < numCoords; j++) 1.217 + clusters[i][j] = objects[i][j]; 1.218 + 1.219 + /* Initialize membership, no object belongs to any cluster yet */ 1.220 + for (i = 0; i < numObjs; i++) 1.221 + membership[i] = -1; 1.222 + 1.223 + /* newClusterSize holds information on the count of members in each cluster */ 1.224 + newClusterSize = (int*)calloc(numClusters, sizeof(int)); 1.225 + assert(newClusterSize != NULL); 1.226 + 1.227 + /* newClusters holds the coordinates of the freshly created clusters */ 1.228 + newClusters = create_array_2d_f(numClusters, numCoords); 1.229 + local_newClusterSize = create_array_2d_i(nthreads, numClusters); 1.230 + 1.231 + /* local_newClusters is a 3D array */ 1.232 + local_newClusters = (double***)malloc(nthreads * sizeof(double**)); 1.233 + assert(local_newClusters != NULL); 1.234 + local_newClusters[0] = (double**) malloc(nthreads * numClusters * sizeof(double*)); 1.235 + assert(local_newClusters[0] != NULL); 1.236 + 1.237 + /* Set up the pointers */ 1.238 + for (i = 1; i < nthreads; i++) 1.239 + local_newClusters[i] = local_newClusters[i-1] + numClusters; 1.240 + 1.241 + for (i = 0; i < nthreads; i++) { 1.242 + for (j = 0; j < numClusters; j++) { 1.243 + local_newClusters[i][j] = (double*)calloc(numCoords, sizeof(double)); 1.244 + assert(local_newClusters[i][j] != NULL); 1.245 + } 1.246 + } 1.247 + /* Perform thread setup */ 1.248 + thread = (pthread_t*)calloc(nthreads, sizeof(pthread_t)); 1.249 + 1.250 + printf("nthreads %d\n", nthreads); 1.251 + pthread_barrier_init(&barr, NULL, nthreads); 1.252 + pthread_attr_init(&attr); 1.253 + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); 1.254 + pthread_mutex_init(&lock1, NULL); 1.255 + finished=0; 1.256 + 1.257 + struct input *ip = malloc(nthreads * sizeof(struct input)); 1.258 + /* Provide thread-safe memory locations for each worker */ 1.259 + for(i = 0; i < nthreads; i++){ 1.260 + ip[i].t = i; 1.261 + ip[i].objects=objects; 1.262 + ip[i].clusters=clusters; 1.263 + ip[i].membership=membership; 1.264 + ip[i].local_newClusterSize=local_newClusterSize; 1.265 + ip[i].local_newClusters=local_newClusters; 1.266 + ip[i].numObjs=numObjs; 1.267 + ip[i].numClusters=numClusters; 1.268 + ip[i].numCoords=numCoords; 1.269 + 1.270 + if (i>0){ 1.271 + rc = pthread_create(&thread[i], &attr, tfwork, (void *)&ip[i]); 1.272 + if (rc) { 1.273 + fprintf(stderr, "ERROR: Return Code For Thread Creation Is %d\n", rc); 1.274 + exit(EXIT_FAILURE); 1.275 + } 1.276 + } 1.277 + } 1.278 + 1.279 + /* === COMPUTATIONAL PHASE === */ 1.280 + 1.281 + do { 1.282 + delta = 0.0; 1.283 + pthread_barrier_wait(&barr); 1.284 + work(&ip[0]); 1.285 + 1.286 + pthread_barrier_wait(&barr); 1.287 + /* Let the main thread perform the array reduction */ 1.288 + for (i = 0; i < numClusters; i++) { 1.289 + for (j = 0; j < nthreads; j++) { 1.290 + newClusterSize[i] += local_newClusterSize[j][i]; 1.291 + local_newClusterSize[j][i] = 0.0; 1.292 + for (k = 0; k < numCoords; k++) { 1.293 + newClusters[i][k] += local_newClusters[j][i][k]; 1.294 + local_newClusters[j][i][k] = 0.0; 1.295 + } 1.296 + } 1.297 + } 1.298 + 1.299 + /* Average the sum and replace old cluster centers with newClusters */ 1.300 + for (i = 0; i < numClusters; i++) { 1.301 + for (j = 0; j < numCoords; j++) { 1.302 + if (newClusterSize[i] > 1) 1.303 + clusters[i][j] = newClusters[i][j] / newClusterSize[i]; 1.304 + newClusters[i][j] = 0.0; /* set back to 0 */ 1.305 + } 1.306 + newClusterSize[i] = 0; /* set back to 0 */ 1.307 + } 1.308 + 1.309 + delta /= numObjs; 1.310 + } while (loop++ < PREC && delta > threshold); 1.311 + 1.312 + // Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program, 1.313 + // but minor double precision floating point inaccuracies caused by threading would otherwise lead to huge differences in computed 1.314 + // iterations, therefore making benchmarking completely unreliable. 1.315 + 1.316 + finished=1; 1.317 + pthread_barrier_wait(&barr); 1.318 + 1.319 + for(i = 1; i < nthreads; i++) { 1.320 + rc = pthread_join(thread[i], NULL); 1.321 + if (rc) { 1.322 + fprintf(stderr, "ERROR: Return Code For Thread Join Is %d\n", rc); 1.323 + exit(EXIT_FAILURE); 1.324 + } 1.325 + } 1.326 + 1.327 + free(ip); 1.328 + free(thread); 1.329 + pthread_barrier_destroy(&barr); 1.330 + pthread_mutex_destroy(&lock1); 1.331 + pthread_attr_destroy(&attr); 1.332 + 1.333 + free(local_newClusterSize[0]); 1.334 + free(local_newClusterSize); 1.335 + 1.336 + for (i = 0; i < nthreads; i++) 1.337 + for (j = 0; j < numClusters; j++) 1.338 + free(local_newClusters[i][j]); 1.339 + free(local_newClusters[0]); 1.340 + free(local_newClusters); 1.341 + 1.342 + free(newClusters[0]); 1.343 + free(newClusters); 1.344 + free(newClusterSize); 1.345 + return clusters; 1.346 +} 1.347 +
