Mercurial > cgi-bin > hgwebdir.cgi > PR > Applications > Vthread > Vthread__KMeans__Bench
view pthreads_kmeans.c @ 0:e69e4c2d612a
Initial pthreads version
| author | Merten Sach <msach@mailbox.tu-berlin.de> |
|---|---|
| date | Wed, 03 Aug 2011 19:30:34 +0200 |
| parents | |
| children | 8e7bdab2840f |
line source
1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2 /* File: pthreads_kmeans.c (OpenMP version) */
3 /* Description: Implementation of simple k-means clustering algorithm */
4 /* This program takes an array of N data objects, each with */
5 /* M coordinates and performs a k-means clustering given a */
6 /* user-provided value of the number of clusters (K). The */
7 /* clustering results are saved in 2 arrays: */
8 /* 1. a returned array of size [K][N] indicating the center */
9 /* coordinates of K clusters */
10 /* 2. membership[N] stores the cluster center ids, each */
11 /* corresponding to the cluster a data object is assigned */
12 /* */
13 /* Author: Wei-keng Liao */
14 /* ECE Department, Northwestern University */
15 /* email: wkliao@ece.northwestern.edu */
16 /* Copyright, 2005, Wei-keng Liao */
17 /* */
18 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <pthread.h>
23 #include <time.h>
24 #include <math.h>
25 #include "kmeans.h"
27 #define PREC 300
29 char __ProgrammName[] = "kmeans";
30 char __DataSet[255];
32 extern int nthreads; /* Thread count */
33 double delta; /* Delta is a value between 0 and 1 describing the percentage of objects which changed cluster membership */
34 volatile int finished;
36 pthread_barrier_t barr;
37 pthread_mutex_t lock1;
38 pthread_attr_t attr;
40 /*
41 * Struct: input
42 * -------------
43 * Encapsulates all the input data for the benchmark, i.e. the object list,
44 * clustering output and the input statistics.
45 */
46 struct input {
47 int t;
48 double **objects; /* Object list */
49 double **clusters; /* Cluster list, out: [numClusters][numCoords] */
50 int *membership; /* For each object, contains the index of the cluster it currently belongs to */
51 int **local_newClusterSize; /* Thread-safe, [nthreads][numClusters] */
52 double ***local_newClusters; /* Thread-safe, [nthreads][numClusters][numCoords] */
53 int numObjs,numClusters,numCoords;
54 };
56 /*
57 * Function: euclid_dist_2
58 * -----------------------
59 * Computes the square of the euclidean distance between two multi-dimensional points.
60 */
61 __inline static double euclid_dist_2(int numdims, double *coord1, double *coord2) {
62 int i;
63 double ans=0.0;
65 for (i=0; i<numdims; i++)
66 ans += (coord1[i]-coord2[i]) * (coord1[i]-coord2[i]);
68 return(ans);
69 }
71 /*
72 * Function: find_nearest_cluster
73 * ------------------------------
74 * Function determining the cluster center which is closest to the given object.
75 * Returns the index of that cluster center.
76 */
77 __inline static int find_nearest_cluster(int numClusters, int numCoords, double *object, double **clusters) {
78 int index, i;
79 double dist, min_dist;
81 /* find the cluster id that has min distance to object */
82 index = 0;
83 min_dist = euclid_dist_2(numCoords, object, clusters[0]);
84 for (i=1; i<numClusters; i++) {
85 dist = euclid_dist_2(numCoords, object, clusters[i]);
87 /* no need square root */
88 if (dist < min_dist) { /* find the min and its array index */
89 min_dist = dist;
90 index = i;
91 }
92 }
93 return index;
94 }
96 void work(struct input *x){
97 int tid = x->t;
98 double local_delta=0;
99 int i;
100 for (i = tid; i < x->numObjs; i += nthreads) {
101 /* find the array index of nearest cluster center */
102 int index = find_nearest_cluster(x->numClusters, x->numCoords,
103 x->objects[i], x->clusters);
105 /* if membership changes, increase delta by 1 */
106 if (x->membership[i] != index)
107 local_delta += 1.0;
108 /* assign the membership to object i */
109 x->membership[i] = index;
111 /* update new cluster centers : sum of all objects located
112 within (average will be performed later) */
113 x->local_newClusterSize[tid][index]++;
114 int j;
115 for (j=0; j < x->numCoords; j++)
116 x->local_newClusters[tid][index][j] += x->objects[i][j];
118 }
119 pthread_mutex_lock(&lock1);
120 delta +=local_delta;
121 pthread_mutex_unlock(&lock1);
122 }
123 /*
124 * Function: thread function work
125 * --------------
126 * Worker function for threading. Work distribution is done so that each thread computers
127 */
128 void* tfwork(void *ip)
129 {
130 struct input *x;
131 x = (struct input *)ip;
133 for(;;){
134 pthread_barrier_wait(&barr);
135 if (finished){
136 break;
137 }
138 work(x);
139 pthread_barrier_wait(&barr);
140 }
142 pthread_exit(NULL);
143 }
145 /*
146 * Function: create_array_2d_f
147 * --------------------------
148 * Allocates memory for a 2-dim double array as needed for the algorithm.
149 */
150 double** create_array_2d_f(int height, int width) {
151 double** ptr;
152 int i;
153 ptr = calloc(height, sizeof(double*));
154 assert(ptr != NULL);
155 ptr[0] = calloc(width * height, sizeof(double));
156 assert(ptr[0] != NULL);
157 /* Assign pointers correctly */
158 for(i = 1; i < height; i++)
159 ptr[i] = ptr[i-1] + width;
160 return ptr;
161 }
163 /*
164 * Function: create_array_2Dd_i
165 * --------------------------
166 * Allocates memory for a 2-dim integer array as needed for the algorithm.
167 */
168 int** create_array_2d_i(int height, int width) {
169 int** ptr;
170 int i;
171 ptr = calloc(height, sizeof(int*));
172 assert(ptr != NULL);
173 ptr[0] = calloc(width * height, sizeof(int));
174 assert(ptr[0] != NULL);
175 /* Assign pointers correctly */
176 for(i = 1; i < height; i++)
177 ptr[i] = ptr[i-1] + width;
178 return ptr;
179 }
181 /*
182 * Function: pthreads_kmeans
183 * -------------------------
184 * Algorithm main function. Returns a 2D array of cluster centers of size [numClusters][numCoords].
185 */
186 double** pthreads_kmeans(int is_perform_atomic, /* in: */
187 double **objects, /* in: [numObjs][numCoords] */
188 int numCoords, /* no. coordinates */
189 int numObjs, /* no. objects */
190 int numClusters, /* no. clusters */
191 double threshold, /* % objects change membership */
192 int *membership) /* out: [numObjs] */
193 {
195 int i, j, k, index, loop = 0, rc;
196 int *newClusterSize; /* [numClusters]: no. objects assigned in each
197 new cluster */
198 double **clusters; /* out: [numClusters][numCoords] */
199 double **newClusters; /* [numClusters][numCoords] */
200 double timing;
201 int **local_newClusterSize; /* [nthreads][numClusters] */
202 double ***local_newClusters; /* [nthreads][numClusters][numCoords] */
204 pthread_t *thread;
206 /* === MEMORY SETUP === */
208 /* [numClusters] clusters of [numCoords] double coordinates each */
209 clusters = create_array_2d_f(numClusters, numCoords);
211 /* Pick first numClusters elements of objects[] as initial cluster centers */
212 for (i=0; i < numClusters; i++)
213 for (j=0; j < numCoords; j++)
214 clusters[i][j] = objects[i][j];
216 /* Initialize membership, no object belongs to any cluster yet */
217 for (i = 0; i < numObjs; i++)
218 membership[i] = -1;
220 /* newClusterSize holds information on the count of members in each cluster */
221 newClusterSize = (int*)calloc(numClusters, sizeof(int));
222 assert(newClusterSize != NULL);
224 /* newClusters holds the coordinates of the freshly created clusters */
225 newClusters = create_array_2d_f(numClusters, numCoords);
226 local_newClusterSize = create_array_2d_i(nthreads, numClusters);
228 /* local_newClusters is a 3D array */
229 local_newClusters = (double***)malloc(nthreads * sizeof(double**));
230 assert(local_newClusters != NULL);
231 local_newClusters[0] = (double**) malloc(nthreads * numClusters * sizeof(double*));
232 assert(local_newClusters[0] != NULL);
234 /* Set up the pointers */
235 for (i = 1; i < nthreads; i++)
236 local_newClusters[i] = local_newClusters[i-1] + numClusters;
238 for (i = 0; i < nthreads; i++) {
239 for (j = 0; j < numClusters; j++) {
240 local_newClusters[i][j] = (double*)calloc(numCoords, sizeof(double));
241 assert(local_newClusters[i][j] != NULL);
242 }
243 }
244 /* Perform thread setup */
245 thread = (pthread_t*)calloc(nthreads, sizeof(pthread_t));
247 printf("nthreads %d\n", nthreads);
248 pthread_barrier_init(&barr, NULL, nthreads);
249 pthread_attr_init(&attr);
250 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
251 pthread_mutex_init(&lock1, NULL);
252 finished=0;
254 struct input *ip = malloc(nthreads * sizeof(struct input));
255 /* Provide thread-safe memory locations for each worker */
256 for(i = 0; i < nthreads; i++){
257 ip[i].t = i;
258 ip[i].objects=objects;
259 ip[i].clusters=clusters;
260 ip[i].membership=membership;
261 ip[i].local_newClusterSize=local_newClusterSize;
262 ip[i].local_newClusters=local_newClusters;
263 ip[i].numObjs=numObjs;
264 ip[i].numClusters=numClusters;
265 ip[i].numCoords=numCoords;
267 if (i>0){
268 rc = pthread_create(&thread[i], &attr, tfwork, (void *)&ip[i]);
269 if (rc) {
270 fprintf(stderr, "ERROR: Return Code For Thread Creation Is %d\n", rc);
271 exit(EXIT_FAILURE);
272 }
273 }
274 }
276 /* === COMPUTATIONAL PHASE === */
278 do {
279 delta = 0.0;
280 pthread_barrier_wait(&barr);
281 work(&ip[0]);
283 pthread_barrier_wait(&barr);
284 /* Let the main thread perform the array reduction */
285 for (i = 0; i < numClusters; i++) {
286 for (j = 0; j < nthreads; j++) {
287 newClusterSize[i] += local_newClusterSize[j][i];
288 local_newClusterSize[j][i] = 0.0;
289 for (k = 0; k < numCoords; k++) {
290 newClusters[i][k] += local_newClusters[j][i][k];
291 local_newClusters[j][i][k] = 0.0;
292 }
293 }
294 }
296 /* Average the sum and replace old cluster centers with newClusters */
297 for (i = 0; i < numClusters; i++) {
298 for (j = 0; j < numCoords; j++) {
299 if (newClusterSize[i] > 1)
300 clusters[i][j] = newClusters[i][j] / newClusterSize[i];
301 newClusters[i][j] = 0.0; /* set back to 0 */
302 }
303 newClusterSize[i] = 0; /* set back to 0 */
304 }
306 delta /= numObjs;
307 } while (loop++ < PREC && delta > threshold);
309 // Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program,
310 // but minor double precision floating point inaccuracies caused by threading would otherwise lead to huge differences in computed
311 // iterations, therefore making benchmarking completely unreliable.
313 finished=1;
314 pthread_barrier_wait(&barr);
316 for(i = 1; i < nthreads; i++) {
317 rc = pthread_join(thread[i], NULL);
318 if (rc) {
319 fprintf(stderr, "ERROR: Return Code For Thread Join Is %d\n", rc);
320 exit(EXIT_FAILURE);
321 }
322 }
324 free(ip);
325 free(thread);
326 pthread_barrier_destroy(&barr);
327 pthread_mutex_destroy(&lock1);
328 pthread_attr_destroy(&attr);
330 free(local_newClusterSize[0]);
331 free(local_newClusterSize);
333 for (i = 0; i < nthreads; i++)
334 for (j = 0; j < numClusters; j++)
335 free(local_newClusters[i][j]);
336 free(local_newClusters[0]);
337 free(local_newClusters);
339 free(newClusters[0]);
340 free(newClusters);
341 free(newClusterSize);
342 return clusters;
343 }
