| rev |
line source |
|
msach@0
|
1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
|
|
msach@0
|
2 /* File: pthreads_kmeans.c (OpenMP version) */
|
|
msach@0
|
3 /* Description: Implementation of simple k-means clustering algorithm */
|
|
msach@0
|
4 /* This program takes an array of N data objects, each with */
|
|
msach@0
|
5 /* M coordinates and performs a k-means clustering given a */
|
|
msach@0
|
6 /* user-provided value of the number of clusters (K). The */
|
|
msach@0
|
7 /* clustering results are saved in 2 arrays: */
|
|
msach@0
|
8 /* 1. a returned array of size [K][N] indicating the center */
|
|
msach@0
|
9 /* coordinates of K clusters */
|
|
msach@0
|
10 /* 2. membership[N] stores the cluster center ids, each */
|
|
msach@0
|
11 /* corresponding to the cluster a data object is assigned */
|
|
msach@0
|
12 /* */
|
|
msach@0
|
13 /* Author: Wei-keng Liao */
|
|
msach@0
|
14 /* ECE Department, Northwestern University */
|
|
msach@0
|
15 /* email: wkliao@ece.northwestern.edu */
|
|
msach@0
|
16 /* Copyright, 2005, Wei-keng Liao */
|
|
msach@0
|
17 /* */
|
|
msach@0
|
18 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
|
|
msach@0
|
19
|
|
msach@0
|
20 #include <stdio.h>
|
|
msach@0
|
21 #include <stdlib.h>
|
|
msach@0
|
22 #include <pthread.h>
|
|
msach@0
|
23 #include <time.h>
|
|
msach@0
|
24 #include <math.h>
|
|
msach@0
|
25 #include "kmeans.h"
|
|
msach@0
|
26
|
|
msach@0
|
27 #define PREC 300
|
|
msach@0
|
28
|
|
msach@0
|
29 char __ProgrammName[] = "kmeans";
|
|
msach@0
|
30 char __DataSet[255];
|
|
msach@0
|
31
|
|
msach@0
|
32 extern int nthreads; /* Thread count */
|
|
msach@0
|
33 double delta; /* Delta is a value between 0 and 1 describing the percentage of objects which changed cluster membership */
|
|
msach@0
|
34 volatile int finished;
|
|
msach@0
|
35
|
|
msach@0
|
36 pthread_barrier_t barr;
|
|
msach@0
|
37 pthread_mutex_t lock1;
|
|
msach@0
|
38 pthread_attr_t attr;
|
|
msach@0
|
39
|
|
msach@0
|
40 /*
|
|
msach@0
|
41 * Struct: input
|
|
msach@0
|
42 * -------------
|
|
msach@0
|
43 * Encapsulates all the input data for the benchmark, i.e. the object list,
|
|
msach@0
|
44 * clustering output and the input statistics.
|
|
msach@0
|
45 */
|
|
msach@0
|
46 struct input {
|
|
msach@0
|
47 int t;
|
|
msach@0
|
48 double **objects; /* Object list */
|
|
msach@0
|
49 double **clusters; /* Cluster list, out: [numClusters][numCoords] */
|
|
msach@0
|
50 int *membership; /* For each object, contains the index of the cluster it currently belongs to */
|
|
msach@0
|
51 int **local_newClusterSize; /* Thread-safe, [nthreads][numClusters] */
|
|
msach@0
|
52 double ***local_newClusters; /* Thread-safe, [nthreads][numClusters][numCoords] */
|
|
msach@0
|
53 int numObjs,numClusters,numCoords;
|
|
msach@0
|
54 };
|
|
msach@0
|
55
|
|
msach@0
|
56 /*
|
|
msach@0
|
57 * Function: euclid_dist_2
|
|
msach@0
|
58 * -----------------------
|
|
msach@0
|
59 * Computes the square of the euclidean distance between two multi-dimensional points.
|
|
msach@0
|
60 */
|
|
msach@0
|
61 __inline static double euclid_dist_2(int numdims, double *coord1, double *coord2) {
|
|
msach@0
|
62 int i;
|
|
msach@0
|
63 double ans=0.0;
|
|
msach@0
|
64
|
|
msach@0
|
65 for (i=0; i<numdims; i++)
|
|
msach@0
|
66 ans += (coord1[i]-coord2[i]) * (coord1[i]-coord2[i]);
|
|
msach@0
|
67
|
|
msach@0
|
68 return(ans);
|
|
msach@0
|
69 }
|
|
msach@0
|
70
|
|
msach@0
|
71 /*
|
|
msach@0
|
72 * Function: find_nearest_cluster
|
|
msach@0
|
73 * ------------------------------
|
|
msach@0
|
74 * Function determining the cluster center which is closest to the given object.
|
|
msach@0
|
75 * Returns the index of that cluster center.
|
|
msach@0
|
76 */
|
|
msach@0
|
77 __inline static int find_nearest_cluster(int numClusters, int numCoords, double *object, double **clusters) {
|
|
msach@0
|
78 int index, i;
|
|
msach@0
|
79 double dist, min_dist;
|
|
msach@0
|
80
|
|
msach@0
|
81 /* find the cluster id that has min distance to object */
|
|
msach@0
|
82 index = 0;
|
|
msach@0
|
83 min_dist = euclid_dist_2(numCoords, object, clusters[0]);
|
|
msach@0
|
84 for (i=1; i<numClusters; i++) {
|
|
msach@0
|
85 dist = euclid_dist_2(numCoords, object, clusters[i]);
|
|
msach@0
|
86
|
|
msach@0
|
87 /* no need square root */
|
|
msach@0
|
88 if (dist < min_dist) { /* find the min and its array index */
|
|
msach@0
|
89 min_dist = dist;
|
|
msach@0
|
90 index = i;
|
|
msach@0
|
91 }
|
|
msach@0
|
92 }
|
|
msach@0
|
93 return index;
|
|
msach@0
|
94 }
|
|
msach@0
|
95
|
|
msach@0
|
96 void work(struct input *x){
|
|
msach@0
|
97 int tid = x->t;
|
|
msach@0
|
98 double local_delta=0;
|
|
msach@0
|
99 int i;
|
|
msach@0
|
100 for (i = tid; i < x->numObjs; i += nthreads) {
|
|
msach@0
|
101 /* find the array index of nearest cluster center */
|
|
msach@0
|
102 int index = find_nearest_cluster(x->numClusters, x->numCoords,
|
|
msach@0
|
103 x->objects[i], x->clusters);
|
|
msach@0
|
104
|
|
msach@0
|
105 /* if membership changes, increase delta by 1 */
|
|
msach@0
|
106 if (x->membership[i] != index)
|
|
msach@0
|
107 local_delta += 1.0;
|
|
msach@0
|
108 /* assign the membership to object i */
|
|
msach@0
|
109 x->membership[i] = index;
|
|
msach@0
|
110
|
|
msach@0
|
111 /* update new cluster centers : sum of all objects located
|
|
msach@0
|
112 within (average will be performed later) */
|
|
msach@0
|
113 x->local_newClusterSize[tid][index]++;
|
|
msach@0
|
114 int j;
|
|
msach@0
|
115 for (j=0; j < x->numCoords; j++)
|
|
msach@0
|
116 x->local_newClusters[tid][index][j] += x->objects[i][j];
|
|
msach@0
|
117
|
|
msach@0
|
118 }
|
|
msach@0
|
119 pthread_mutex_lock(&lock1);
|
|
msach@0
|
120 delta +=local_delta;
|
|
msach@0
|
121 pthread_mutex_unlock(&lock1);
|
|
msach@0
|
122 }
|
|
msach@0
|
123 /*
|
|
msach@0
|
124 * Function: thread function work
|
|
msach@0
|
125 * --------------
|
|
msach@0
|
126 * Worker function for threading. Work distribution is done so that each thread computers
|
|
msach@0
|
127 */
|
|
msach@0
|
128 void* tfwork(void *ip)
|
|
msach@0
|
129 {
|
|
msach@0
|
130 struct input *x;
|
|
msach@0
|
131 x = (struct input *)ip;
|
|
msach@0
|
132
|
|
msach@0
|
133 for(;;){
|
|
msach@0
|
134 pthread_barrier_wait(&barr);
|
|
msach@0
|
135 if (finished){
|
|
msach@0
|
136 break;
|
|
msach@0
|
137 }
|
|
msach@0
|
138 work(x);
|
|
msach@0
|
139 pthread_barrier_wait(&barr);
|
|
msach@0
|
140 }
|
|
msach@0
|
141
|
|
msach@0
|
142 pthread_exit(NULL);
|
|
msach@0
|
143 }
|
|
msach@0
|
144
|
|
msach@0
|
145 /*
|
|
msach@0
|
146 * Function: create_array_2d_f
|
|
msach@0
|
147 * --------------------------
|
|
msach@0
|
148 * Allocates memory for a 2-dim double array as needed for the algorithm.
|
|
msach@0
|
149 */
|
|
msach@0
|
150 double** create_array_2d_f(int height, int width) {
|
|
msach@0
|
151 double** ptr;
|
|
msach@0
|
152 int i;
|
|
msach@0
|
153 ptr = calloc(height, sizeof(double*));
|
|
msach@0
|
154 assert(ptr != NULL);
|
|
msach@0
|
155 ptr[0] = calloc(width * height, sizeof(double));
|
|
msach@0
|
156 assert(ptr[0] != NULL);
|
|
msach@0
|
157 /* Assign pointers correctly */
|
|
msach@0
|
158 for(i = 1; i < height; i++)
|
|
msach@0
|
159 ptr[i] = ptr[i-1] + width;
|
|
msach@0
|
160 return ptr;
|
|
msach@0
|
161 }
|
|
msach@0
|
162
|
|
msach@0
|
163 /*
|
|
msach@0
|
164 * Function: create_array_2Dd_i
|
|
msach@0
|
165 * --------------------------
|
|
msach@0
|
166 * Allocates memory for a 2-dim integer array as needed for the algorithm.
|
|
msach@0
|
167 */
|
|
msach@0
|
168 int** create_array_2d_i(int height, int width) {
|
|
msach@0
|
169 int** ptr;
|
|
msach@0
|
170 int i;
|
|
msach@0
|
171 ptr = calloc(height, sizeof(int*));
|
|
msach@0
|
172 assert(ptr != NULL);
|
|
msach@0
|
173 ptr[0] = calloc(width * height, sizeof(int));
|
|
msach@0
|
174 assert(ptr[0] != NULL);
|
|
msach@0
|
175 /* Assign pointers correctly */
|
|
msach@0
|
176 for(i = 1; i < height; i++)
|
|
msach@0
|
177 ptr[i] = ptr[i-1] + width;
|
|
msach@0
|
178 return ptr;
|
|
msach@0
|
179 }
|
|
msach@0
|
180
|
|
msach@0
|
181 /*
|
|
msach@0
|
182 * Function: pthreads_kmeans
|
|
msach@0
|
183 * -------------------------
|
|
msach@0
|
184 * Algorithm main function. Returns a 2D array of cluster centers of size [numClusters][numCoords].
|
|
msach@0
|
185 */
|
|
msach@0
|
186 double** pthreads_kmeans(int is_perform_atomic, /* in: */
|
|
msach@0
|
187 double **objects, /* in: [numObjs][numCoords] */
|
|
msach@0
|
188 int numCoords, /* no. coordinates */
|
|
msach@0
|
189 int numObjs, /* no. objects */
|
|
msach@0
|
190 int numClusters, /* no. clusters */
|
|
msach@0
|
191 double threshold, /* % objects change membership */
|
|
msach@0
|
192 int *membership) /* out: [numObjs] */
|
|
msach@0
|
193 {
|
|
msach@0
|
194
|
|
msach@0
|
195 int i, j, k, index, loop = 0, rc;
|
|
msach@0
|
196 int *newClusterSize; /* [numClusters]: no. objects assigned in each
|
|
msach@0
|
197 new cluster */
|
|
msach@0
|
198 double **clusters; /* out: [numClusters][numCoords] */
|
|
msach@0
|
199 double **newClusters; /* [numClusters][numCoords] */
|
|
msach@0
|
200 double timing;
|
|
msach@0
|
201 int **local_newClusterSize; /* [nthreads][numClusters] */
|
|
msach@0
|
202 double ***local_newClusters; /* [nthreads][numClusters][numCoords] */
|
|
msach@0
|
203
|
|
msach@0
|
204 pthread_t *thread;
|
|
msach@0
|
205
|
|
msach@0
|
206 /* === MEMORY SETUP === */
|
|
msach@0
|
207
|
|
msach@0
|
208 /* [numClusters] clusters of [numCoords] double coordinates each */
|
|
msach@0
|
209 clusters = create_array_2d_f(numClusters, numCoords);
|
|
msach@0
|
210
|
|
msach@0
|
211 /* Pick first numClusters elements of objects[] as initial cluster centers */
|
|
msach@0
|
212 for (i=0; i < numClusters; i++)
|
|
msach@0
|
213 for (j=0; j < numCoords; j++)
|
|
msach@0
|
214 clusters[i][j] = objects[i][j];
|
|
msach@0
|
215
|
|
msach@0
|
216 /* Initialize membership, no object belongs to any cluster yet */
|
|
msach@0
|
217 for (i = 0; i < numObjs; i++)
|
|
msach@0
|
218 membership[i] = -1;
|
|
msach@0
|
219
|
|
msach@0
|
220 /* newClusterSize holds information on the count of members in each cluster */
|
|
msach@0
|
221 newClusterSize = (int*)calloc(numClusters, sizeof(int));
|
|
msach@0
|
222 assert(newClusterSize != NULL);
|
|
msach@0
|
223
|
|
msach@0
|
224 /* newClusters holds the coordinates of the freshly created clusters */
|
|
msach@0
|
225 newClusters = create_array_2d_f(numClusters, numCoords);
|
|
msach@0
|
226 local_newClusterSize = create_array_2d_i(nthreads, numClusters);
|
|
msach@0
|
227
|
|
msach@0
|
228 /* local_newClusters is a 3D array */
|
|
msach@0
|
229 local_newClusters = (double***)malloc(nthreads * sizeof(double**));
|
|
msach@0
|
230 assert(local_newClusters != NULL);
|
|
msach@0
|
231 local_newClusters[0] = (double**) malloc(nthreads * numClusters * sizeof(double*));
|
|
msach@0
|
232 assert(local_newClusters[0] != NULL);
|
|
msach@0
|
233
|
|
msach@0
|
234 /* Set up the pointers */
|
|
msach@0
|
235 for (i = 1; i < nthreads; i++)
|
|
msach@0
|
236 local_newClusters[i] = local_newClusters[i-1] + numClusters;
|
|
msach@0
|
237
|
|
msach@0
|
238 for (i = 0; i < nthreads; i++) {
|
|
msach@0
|
239 for (j = 0; j < numClusters; j++) {
|
|
msach@0
|
240 local_newClusters[i][j] = (double*)calloc(numCoords, sizeof(double));
|
|
msach@0
|
241 assert(local_newClusters[i][j] != NULL);
|
|
msach@0
|
242 }
|
|
msach@0
|
243 }
|
|
msach@0
|
244 /* Perform thread setup */
|
|
msach@0
|
245 thread = (pthread_t*)calloc(nthreads, sizeof(pthread_t));
|
|
msach@0
|
246
|
|
msach@0
|
247 printf("nthreads %d\n", nthreads);
|
|
msach@0
|
248 pthread_barrier_init(&barr, NULL, nthreads);
|
|
msach@0
|
249 pthread_attr_init(&attr);
|
|
msach@0
|
250 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
|
|
msach@0
|
251 pthread_mutex_init(&lock1, NULL);
|
|
msach@0
|
252 finished=0;
|
|
msach@0
|
253
|
|
msach@0
|
254 struct input *ip = malloc(nthreads * sizeof(struct input));
|
|
msach@0
|
255 /* Provide thread-safe memory locations for each worker */
|
|
msach@0
|
256 for(i = 0; i < nthreads; i++){
|
|
msach@0
|
257 ip[i].t = i;
|
|
msach@0
|
258 ip[i].objects=objects;
|
|
msach@0
|
259 ip[i].clusters=clusters;
|
|
msach@0
|
260 ip[i].membership=membership;
|
|
msach@0
|
261 ip[i].local_newClusterSize=local_newClusterSize;
|
|
msach@0
|
262 ip[i].local_newClusters=local_newClusters;
|
|
msach@0
|
263 ip[i].numObjs=numObjs;
|
|
msach@0
|
264 ip[i].numClusters=numClusters;
|
|
msach@0
|
265 ip[i].numCoords=numCoords;
|
|
msach@0
|
266
|
|
msach@0
|
267 if (i>0){
|
|
msach@0
|
268 rc = pthread_create(&thread[i], &attr, tfwork, (void *)&ip[i]);
|
|
msach@0
|
269 if (rc) {
|
|
msach@0
|
270 fprintf(stderr, "ERROR: Return Code For Thread Creation Is %d\n", rc);
|
|
msach@0
|
271 exit(EXIT_FAILURE);
|
|
msach@0
|
272 }
|
|
msach@0
|
273 }
|
|
msach@0
|
274 }
|
|
msach@0
|
275
|
|
msach@0
|
276 /* === COMPUTATIONAL PHASE === */
|
|
msach@0
|
277
|
|
msach@0
|
278 do {
|
|
msach@0
|
279 delta = 0.0;
|
|
msach@0
|
280 pthread_barrier_wait(&barr);
|
|
msach@0
|
281 work(&ip[0]);
|
|
msach@0
|
282
|
|
msach@0
|
283 pthread_barrier_wait(&barr);
|
|
msach@0
|
284 /* Let the main thread perform the array reduction */
|
|
msach@0
|
285 for (i = 0; i < numClusters; i++) {
|
|
msach@0
|
286 for (j = 0; j < nthreads; j++) {
|
|
msach@0
|
287 newClusterSize[i] += local_newClusterSize[j][i];
|
|
msach@0
|
288 local_newClusterSize[j][i] = 0.0;
|
|
msach@0
|
289 for (k = 0; k < numCoords; k++) {
|
|
msach@0
|
290 newClusters[i][k] += local_newClusters[j][i][k];
|
|
msach@0
|
291 local_newClusters[j][i][k] = 0.0;
|
|
msach@0
|
292 }
|
|
msach@0
|
293 }
|
|
msach@0
|
294 }
|
|
msach@0
|
295
|
|
msach@0
|
296 /* Average the sum and replace old cluster centers with newClusters */
|
|
msach@0
|
297 for (i = 0; i < numClusters; i++) {
|
|
msach@0
|
298 for (j = 0; j < numCoords; j++) {
|
|
msach@0
|
299 if (newClusterSize[i] > 1)
|
|
msach@0
|
300 clusters[i][j] = newClusters[i][j] / newClusterSize[i];
|
|
msach@0
|
301 newClusters[i][j] = 0.0; /* set back to 0 */
|
|
msach@0
|
302 }
|
|
msach@0
|
303 newClusterSize[i] = 0; /* set back to 0 */
|
|
msach@0
|
304 }
|
|
msach@0
|
305
|
|
msach@0
|
306 delta /= numObjs;
|
|
msach@0
|
307 } while (loop++ < PREC && delta > threshold);
|
|
msach@0
|
308
|
|
msach@0
|
309 // Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program,
|
|
msach@0
|
310 // but minor double precision floating point inaccuracies caused by threading would otherwise lead to huge differences in computed
|
|
msach@0
|
311 // iterations, therefore making benchmarking completely unreliable.
|
|
msach@0
|
312
|
|
msach@0
|
313 finished=1;
|
|
msach@0
|
314 pthread_barrier_wait(&barr);
|
|
msach@0
|
315
|
|
msach@0
|
316 for(i = 1; i < nthreads; i++) {
|
|
msach@0
|
317 rc = pthread_join(thread[i], NULL);
|
|
msach@0
|
318 if (rc) {
|
|
msach@0
|
319 fprintf(stderr, "ERROR: Return Code For Thread Join Is %d\n", rc);
|
|
msach@0
|
320 exit(EXIT_FAILURE);
|
|
msach@0
|
321 }
|
|
msach@0
|
322 }
|
|
msach@0
|
323
|
|
msach@0
|
324 free(ip);
|
|
msach@0
|
325 free(thread);
|
|
msach@0
|
326 pthread_barrier_destroy(&barr);
|
|
msach@0
|
327 pthread_mutex_destroy(&lock1);
|
|
msach@0
|
328 pthread_attr_destroy(&attr);
|
|
msach@0
|
329
|
|
msach@0
|
330 free(local_newClusterSize[0]);
|
|
msach@0
|
331 free(local_newClusterSize);
|
|
msach@0
|
332
|
|
msach@0
|
333 for (i = 0; i < nthreads; i++)
|
|
msach@0
|
334 for (j = 0; j < numClusters; j++)
|
|
msach@0
|
335 free(local_newClusters[i][j]);
|
|
msach@0
|
336 free(local_newClusters[0]);
|
|
msach@0
|
337 free(local_newClusters);
|
|
msach@0
|
338
|
|
msach@0
|
339 free(newClusters[0]);
|
|
msach@0
|
340 free(newClusters);
|
|
msach@0
|
341 free(newClusterSize);
|
|
msach@0
|
342 return clusters;
|
|
msach@0
|
343 }
|
|
msach@0
|
344
|