Sunday, May 31, 2015

K-means algorithm

K-means is a standard clustering method in machine learning. It aims to partition n observations in d dimension into k clusters in which each observation belongs to the cluster with the nearest mean.

The algorithm provided here uses standard Lloyd's algorithm. It is provided by (not me) Dpark as an example. Dpark is a Python clone of Spark written by a group of enthusiastic developers. Even though Spark now provides PySpark as a Python API for Spark, this project is still an impressive one. Enough credits should be given. :)

Note, this is not a tutorial about Spark or Dpark, for more information about Spark in Pyton, check PySpark doc or Dpark manual (in Chinese).

The original source code is from here.

The Llyord algorithm

  • Input: 
    • a set of n data points
    • initialize randomly k centroids
  • Repeat until convergence:
    • set point i to the nearest centroid based on Euclidean distance
    • update cluster centroid to be the mean of all the points assign to it

The main function

Here are some Spark methods used in the main function, the link on the name of the function directs to PySpark documentation, the second link directs to Dpark src.
textFile(): read file from file system. src
cache(): cache this RDD. src
map(): map the RDD to a new RDD based on the provided function. src
In the K-means implementation:

mappedPoints = points.map(lambda p: (closestCenter(p, centers), (p, 1)))

maps each point to the function closestCenter(p, centers), which assigns the point to the closest center, and return a key-value pair, where the key is the centroid index and the value is the tuple (point, 1), 1 is the count.

reduceByKey(): Merge the values for each key using the input function. src
In the K-means implementation:

mappedPoints.reduceByKey(
                lambda (s1,c1),(s2,c2): (s1+s2,c1+c2)
            )

aggregates the mappedPoints (key-value pair) based on the function (sum(points), sum(count)), and returns (centroid index, (sum of points, sum of counts))

collectAsMap(): return the key-value pair src


if __name__ == '__main__':
    # initialization
    D = 4  # d dimension
    K = 3  # number of clusters
    IT = 10  # number of iterations
    MIN_DIST = 0.01  # threshold for convergence
    # initialize k random centroids
    centers = [Vector([random.random() for j in range(D)]) for i in range(K)]
    # read the data points from file and parse them to vectors
    points = dpark.textFile('kmeans_data.txt').map(parseVector).cache()

    # iteration
    for it in range(IT):
        print 'iteration', it
        # assign each point to the closest centroid
        # return (centroid index, (point, count = 1))
        mappedPoints = points.map(lambda p: (closestCenter(p, centers), (p, 1)))
        # calculate the new center of the points within the cluster
        # reduceByKey() returns (centroid index, (sum of points, sum of counts)
        # then map the output to (centroid index, sum of points/sum of counts)
        # sum of points/sum of counts is the new center of the cluster
        ncenters = mappedPoints.reduceByKey(
                lambda (s1,c1),(s2,c2): (s1+s2,c1+c2)
            ).map(
                lambda (id, (sum, count)): (id, sum/count)
            ).collectAsMap()

        updated = False
        # update the new center
        for i in ncenters:
            # if the distance between the center and the new center is greater
            # than MIN_DIST, update the center to new center
            if centers[i].dist(ncenters[i]) > MIN_DIST:
                centers[i] = ncenters[i]
                updated = True
        # if there is no update, then all points are clustered
        # break the loop
        if not updated:
            break
        print centers

    print 'final', centers




parseVector()

The method first split the line (from the file) by white space then returns a Vector object.

def parseVector(line):
    return Vector(map(float, line.strip().split(' ')))

The Vector class can be found here.

Note: map(function, iterable) from Python
maps every item in an iterable based on the provided function and return a new iterator. In the above function, the splitted line is mapped to float number.



closestCenter()

assign the point to the closest centroid based on the Euclidean distance.

def closestCenter(p, centers):
    bestDist = p.squaredDist(centers[0])
    bestIndex = 0
    for i in range(1, len(centers)):
        d = p.squaredDist(centers[i])
        if d < bestDist:
            bestDist = d
            bestIndex = i
    return bestIndex

squaredDist() is in the Vector class.

Note: zip() from Python
Make an iterator that aggregates each element from each of the iterable. For example, in the squaredDist() method:

 return sum((a-b)*(a-b) for a,b in zip(self.data, o.data))

self.data and o.data are coordinates of points, say (1.0, 2.0, 3.0) and (4.0, 5.0, 6.0), zip function returns an object of ((1.0, 4.0), (2.0, 5.0), (3.0, 6.0))


No comments:

Post a Comment