The algorithm provided here uses standard Lloyd's algorithm. It is provided by (not me) Dpark as an example. Dpark is a Python clone of Spark written by a group of enthusiastic developers. Even though Spark now provides PySpark as a Python API for Spark, this project is still an impressive one. Enough credits should be given. :)
Note, this is not a tutorial about Spark or Dpark, for more information about Spark in Pyton, check PySpark doc or Dpark manual (in Chinese).
The original source code is from here.
The Llyord algorithm
- Input:
- a set of n data points
- initialize randomly k centroids
- Repeat until convergence:
- set point i to the nearest centroid based on Euclidean distance
- update cluster centroid to be the mean of all the points assign to it
The main function
Here are some Spark methods used in the main function, the link on the name of the function directs to PySpark documentation, the second link directs to Dpark src.
textFile(): read file from file system. src
cache(): cache this RDD. src
map(): map the RDD to a new RDD based on the provided function. src
In the K-means implementation:
mappedPoints = points.map(lambda p: (closestCenter(p, centers), (p, 1)))
maps each point to the function closestCenter(p, centers), which assigns the point to the closest center, and return a key-value pair, where the key is the centroid index and the value is the tuple (point, 1), 1 is the count.
reduceByKey(): Merge the values for each key using the input function. src
In the K-means implementation:
mappedPoints.reduceByKey( lambda (s1,c1),(s2,c2): (s1+s2,c1+c2) )
aggregates the mappedPoints (key-value pair) based on the function (sum(points), sum(count)), and returns (centroid index, (sum of points, sum of counts))
collectAsMap(): return the key-value pair src
if __name__ == '__main__': # initialization D = 4 # d dimension K = 3 # number of clusters IT = 10 # number of iterations MIN_DIST = 0.01 # threshold for convergence # initialize k random centroids centers = [Vector([random.random() for j in range(D)]) for i in range(K)] # read the data points from file and parse them to vectors points = dpark.textFile('kmeans_data.txt').map(parseVector).cache() # iteration for it in range(IT): print 'iteration', it # assign each point to the closest centroid # return (centroid index, (point, count = 1)) mappedPoints = points.map(lambda p: (closestCenter(p, centers), (p, 1))) # calculate the new center of the points within the cluster # reduceByKey() returns (centroid index, (sum of points, sum of counts) # then map the output to (centroid index, sum of points/sum of counts) # sum of points/sum of counts is the new center of the cluster ncenters = mappedPoints.reduceByKey( lambda (s1,c1),(s2,c2): (s1+s2,c1+c2) ).map( lambda (id, (sum, count)): (id, sum/count) ).collectAsMap() updated = False # update the new center for i in ncenters: # if the distance between the center and the new center is greater # than MIN_DIST, update the center to new center if centers[i].dist(ncenters[i]) > MIN_DIST: centers[i] = ncenters[i] updated = True # if there is no update, then all points are clustered # break the loop if not updated: break print centers print 'final', centers
parseVector()
The method first split the line (from the file) by white space then returns a Vector object.
def parseVector(line): return Vector(map(float, line.strip().split(' ')))
The Vector class can be found here.
Note: map(function, iterable) from Python
maps every item in an iterable based on the provided function and return a new iterator. In the above function, the splitted line is mapped to float number.
closestCenter()
assign the point to the closest centroid based on the Euclidean distance.
def closestCenter(p, centers): bestDist = p.squaredDist(centers[0]) bestIndex = 0 for i in range(1, len(centers)): d = p.squaredDist(centers[i]) if d < bestDist: bestDist = d bestIndex = i return bestIndex
squaredDist() is in the Vector class.
Note: zip() from Python
Make an iterator that aggregates each element from each of the iterable. For example, in the squaredDist() method:
return sum((a-b)*(a-b) for a,b in zip(self.data, o.data))
self.data and o.data are coordinates of points, say (1.0, 2.0, 3.0) and (4.0, 5.0, 6.0), zip function returns an object of ((1.0, 4.0), (2.0, 5.0), (3.0, 6.0))