Thursday, December 24, 2015

Spark Streaming Note: How to parse configuration correctly

I was testing Spark Streaming when I encountered the following problem:

Considering the following example:


public class StreamingWorking {

 public static void main(String[] args) {
  String topic = "test";
  int numThreads = 1;
  String checkpointDirectory = "/home/temp";
  String zookeeper = "localhost:2181";
  String consumerGroupId = "streaming_test";
  SparkConf sparkConf = new SparkConf().setAppName("StreamingTest");
  sparkConf.setMaster("spark://localhost:7077");
  sparkConf.setSparkHome("/home/spark");
  String[] jars = {"/home/spark/spark_try.jar"};
  sparkConf.setJars(jars);
  JavaStreamingContext context = new JavaStreamingContext(sparkConf, Durations.seconds(2));
  context.checkpoint(checkpointDirectory);
  sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true");
  Map<String, Integer> topics = new HashMap<String, Integer>();
  topics.put(topic, numThreads);
  JavaPairInputDStream messages = KafkaUtils.createStream(context, zookeeper, 
    consumerGroupId, topics, StorageLevel.MEMORY_AND_DISK_SER());
  JavaDstream lines = messages.map(new Function, String>() {
   public String call(Tuple2 tuple2) {
    return tuple2._2();
   }
  });
  lines.print();
  context.start();
  context.awaitTermination();
 }
}


It works perfectly fine if you compile and run.
However, if we want to process the incoming message based on some input configurations:

public class StreamingNotWorking {
 private static Config config1;
 private static Properties configFile;

 public StreamingNotWorking(Config config1, Properties configFile) {
  Streaming.config1 = config1;
  Streaming.configFile = configFile;
 }
 public void runStreaming() {
  String topic = configFile.getProperty("topic");
  int numThreads = Integer.parseInt(configFile.getProperty("topic"));
  String checkpointDirectory = configFile.getProperty("checkpointDirectory");
  String zookeeper = configFile.getProperty("zookeeper");
  String consumerGroupId = configFile.getProperty("consumerGroupId");
  SparkConf sparkConf = new SparkConf().setAppName("StreamingTest");
  sparkConf.setMaster(configFile.getProperty("sparkMaster"));
  sparkConf.setSparkHome(configFile.getProperty("sparkHome"));
  String[] jars = {"/home/spark/spark_try.jar"};
  sparkConf.setJars(jars);
  JavaStreamingContext context = new JavaStreamingContext(sparkConf, Durations.seconds(2));
  context.checkpoint(checkpointDirectory);
  sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true");
  Map topics = new HashMap();
  topics.put(topic, numThreads);
  JavaPairInputDStream messages = KafkaUtils.createStream(context, zookeeper, 
    consumerGroupId, topics, StorageLevel.MEMORY_AND_DISK_SER());
  JavaDstream lines = messages.map(new Function, String>() {
   public String call(Tuple2 tuple2) {
    return tuple2._2();
   }
  });
  JavaDstream processedMessage = lines.map(new Function() {
   public ProcessingMessage call(String lines) {
    return  new ProcessingMessage(lines, config1);
   }
  });
  processedMessage.print();
  context.start();
  context.awaitTermination();
 }
 
 private class ProcessingMessage {
  private static Config config1;
  public ProcessingMessage(String lines, Config config1) {
       ProcessingMessage.config1 = config1;
  }

}

public class StreamingMain {
 public static void main(String[] args) {
  Properties configFile = new Properties();
  configFile.load(StreamingMain.class.getClassLoader().getResourceAsStream(args[0]));
  Config config = new Config(configFile.getProperty("config"));
  try {
   new StreamingNotWorking(config, configFile).runStreaming();
  } catch (Exception e) {
   System.err.println(e);
  }
 }
}

The above code will throw NullPointerException on the Config class. I spent lots of fruitless days trying to figure out where the problem is. Then my colleague enlightened me:


public class StreamingWorkingAgain {

 public void runStreaming(String configFileString) {
  Properties configFile = new Properties();
  configFile.load(this.getClass().getClassLoader().getResourceAsStream(configFileString));
  String topic = configFile.getProperty("topic");
  int numThreads = Integer.parseInt(configFile.getProperty("topic"));
  String checkpointDirectory = configFile.getProperty("checkpointDirectory");
  String zookeeper = configFile.getProperty("zookeeper");
  String consumerGroupId = configFile.getProperty("consumerGroupId");
  SparkConf sparkConf = new SparkConf().setAppName("StreamingTest");
  sparkConf.setMaster(configFile.getProperty("sparkMaster"));
  sparkConf.setSparkHome(configFile.getProperty("sparkHome"));
  String[] jars = {"/home/spark/spark_try.jar"};
  sparkConf.setJars(jars);
  JavaStreamingContext context = new JavaStreamingContext(sparkConf, Durations.seconds(2));
  context.checkpoint(checkpointDirectory);
  sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true");
  Map topics = new HashMap();
  topics.put(topic, numThreads);
  JavaPairInputDStream messages = KafkaUtils.createStream(context, zookeeper, 
    consumerGroupId, topics, StorageLevel.MEMORY_AND_DISK_SER());
  JavaDstream lines = messages.map(new Function, String>() {
   public String call(Tuple2 tuple2) {
    return tuple2._2();
   }
  });
  JavaDstream processedMessage = lines.map(new Function() {
   public ProcessingMessage call(String lines) {
    return  new ProcessingMessage(lines, configFileString);
   }
  });
  lines.print();
  context.start();
  context.awaitTermination();
 }
 
 private class ProcessingMessage {
                private static Config config1;
  public ProcessingMessage(String lines, String configFileString) {
   Properties config = new Properties();
   try {
    config.load(this.getClass().getClassLoader().getResourceAsStream(configFileString));
   } catch (IOException e) {
    e.printStackTrace();
   }
   ProcessingMessage.config1 = new Config(config.getProperty("config1"));
  }
 }

}

public class StreamingMain {
 public static void main(String[] args) {
  String configFileString = args[0];
  try {
   new StreamingWorkingAgain().runStreaming(configFileString);
  } catch (Exception e) {
   System.err.println(e);
  }
 }
}

The only difference is that: instead of parsing the Properties object ("configFile") and Config object ("config1"), a String "configFileString" is parsed into ProcessingMessage class to create JavaDStream object. So why is it working?


In Spark, the SparkContext is created in the driver program (see the figure below), which is the following part:

SparkConf sparkConf = new SparkConf().setAppName("StreamingTest");
  sparkConf.setMaster(configFile.getProperty("sparkMaster"));
  sparkConf.setSparkHome(configFile.getProperty("sparkHome"));
  String[] jars = {"/home/spark/spark_try.jar"};
  sparkConf.setJars(jars);
  JavaStreamingContext context = new JavaStreamingContext(sparkConf, Durations.seconds(2));
  context.checkpoint(checkpointDirectory);
  sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true");


Source: http://spark.apache.org/docs/latest/cluster-overview.html


After that, all the Spark configurations and other processing methods will be serialized and sent to worker nodes. All message processing will be done on worker nodes. If the Config is a static object in driver program, and is parsed directly to a worker node, it will not be reinitialized on the worker node, and we will see NullPointerException.

Instead, we can parse the configFile string to the worker node, and initialize the Config on the worker node. Since Config is static in ProcessingMessage class, it will only be initialized once.



* The code is just an example, it may not be compiled.

Monday, December 21, 2015

The end, and a new start

Finally my thesis is accepted this morning. Now I am officially a Ph.D, which is useless to every part of my current life.

So ironic, and pathetic.  

The same time last year, the weather is similar, but much colder. I probably was on campus, just headed back from Mashall st. to get another cup of coffee, and started practicing Leetcode. I was struggling and scared at that time, and not knowing what would happen the next year. But I refused to give up.

Now I am sitting in my apartment in CA, enjoying my first day of the two-week vacation, and writing this blog. I am happy.

Half a year after I moved to Bay Area, the words I said most to people is "This is the best decision I have ever made in my life". I am glad to have the life I am having now, and I am grateful.

My job searching was not that smooth. I only got two onsite interviews. On the day I got rejected by the first company, I was devastated. I texted my friend: "this was a huge humiliation in my life and I will never let it happen again." He texted back:"This is just part of your life journey, and soon you will realize it's nothing".

He was right. Two days later I got an offer from my current company. The next couple months became the best time in my life after my high school. My colleagues are friendly, the projects are fun, I have met lots of new friends and, life is awesome.

The first month in my new company, all I was thinking was quitting it and finding a new job in a "better" company. My company is not one of those famous big companies in Silicon Valley. It is a game company and I don't even play games every often. It's a startup which only provides options but no actual stock (earning big once it goes IPO is another story). Those are the reasons I wanted to leave. But three months later, I started to think again. Except the above reasons, I have everything I want for a job: a reasonable boss, a great mentor, friendly colleagues, fun projects, lots of things I can learn, decent salary and good benefits. I ask myself, is it that important to get to one of those "famous" companies? I may not have so much fun in my job and the people around may not be as nice as I have now. I am not sure what decision I will make next year, but I can always have a try.

I always thought I can only get the second best thing, it's my destiny. Now I think nothing is in fact perfect. If I got into the first company, which was my dream company, I probably end up in a boring job. Everything is "not perfect, but good enough". All I have to do is to be grateful about what I have and try my best to get what I want.


I always tried to gasp whatever I want. I thought if I try harder, use another way, it may work. Now I think it might not be the case. Everything is about the "right" thing at the "right" moment. I was too stubborn. Now I always ask myself, how can I be so sure that it is the "right" thing? And if I cannot convince myself, it probably also is not the right time to pursue it.

Yet there is no time the perfect timing. And at some time I have to make decisions. It probably is true that I should always wait until I gather enough information. But when? My friend said, if you cannot make a decision, then wait until the last moment, and follow your heart, because that would always be what you want. I can't disagree, but I don't agree. Betting my life on randomness is definitely not who I am. I prefer doing more analysis and make rational decisions. However, life is indeed balanced. I choose something, and I always lose another. So if I make up my mind, then don't regret it.

2015 is a year of life transition. I experienced and learned too many things. It's the end of the pathetic-graduate-student era, but the start as a self-independent Shirley. 2016 would be another exciting year. I am not sure if I will successfully be granted the visa and stay in the States. I may be deprived I have right now. It's scary, but if that happens, I will (have to) figure out a way.

Merry Christmas and Happy holidays, to everyone.


Sunday, October 18, 2015

Scala note 7: Water pouring problem

The problem is described in the following video:


It is solved using BFS approach.


class Pouring (capacity: Vector[Int]){
  
  // States: for each glass (Glass) how much water is in
  //size of state is number of glasses, value represents the volume
  //of the water in that glass
  type State = Vector[Int]
  val initialState = capacity map(x => 0)
  
  //Moves
  trait Move {
    //take a state and return a new state
    def change(state: State): State
  }
  //empty the glass
  case class Empty(glass: Int) extends Move {
    //updated: a copy of the list with one single replaced element
    def change(state: State) = state updated (glass, 0)
  }
  //fill the glass from source
  case class Fill(glass:Int) extends Move {
    def change(state: State) = state updated(glass, capacity(glass))
  }
  //pour from one glass to another
  case class Pour(from: Int, to: Int) extends Move {
    def change(state: State) = {
      val amount = state(from) min (capacity(to) - state(to))
      state updated (from, state(from) - amount) updated(to, state(to) + amount)
    }
  }
  //all glasses
  val glasses = 0 until capacity.length
  
  // all possible moves
  val moves = 
     (for (g <- class="" data-blogger-escaped-case="" data-blogger-escaped-current="" data-blogger-escaped-def="" data-blogger-escaped-empty="" data-blogger-escaped-endstate:="" data-blogger-escaped-fill="" data-blogger-escaped-for="" data-blogger-escaped-from="" data-blogger-escaped-g="" data-blogger-escaped-glasses="" data-blogger-escaped-history:="" data-blogger-escaped-if="" data-blogger-escaped-list="" data-blogger-escaped-match="" data-blogger-escaped-moves="" data-blogger-escaped-nil="" data-blogger-escaped-of="" data-blogger-escaped-ove="" data-blogger-escaped-path="" data-blogger-escaped-paths:="" data-blogger-escaped-pour="" data-blogger-escaped-private="" data-blogger-escaped-sequence="" data-blogger-escaped-state="xs" data-blogger-escaped-to="" data-blogger-escaped-trackstate="" data-blogger-escaped-val="" data-blogger-escaped-xs:="" data-blogger-escaped-yield=""> initialState
      case move :: xs1 => move change trackState(xs1)
    }*/
    // extend another move
    //new endState: move that affects the old endState
    def extend(move: Move) = new Path(move :: history, move change endState)
    override def toString = (history.reverse mkString " ") + " -->" + endState
  }
  
  val initialPath = new Path(Nil, initialState)
  //explored: the states that have been visited
  def from(paths: Set[Path], explored: Set[State]): Stream[Set[Path]] =
    if (paths.isEmpty) Stream.empty
    //generate all the possible new paths
    else {
      val more = for {
        path <- paths
        //next path by extending current path
        //for all possible moves
        next <- moves map path.extend
        if !(explored contains next.endState)
    } yield next
    paths #:: from(more, explored ++(more map(_.endState)))
   }

   //all possible paths
   //the first element gives the initial path set
   //the set of all possible paths at length 1
   val pathSets = from(Set(initialPath), Set(initialState))

  //the volume we want to see in one of glasses
  def solutions(target: Int): Stream[Path] =
    for {
      pathSet <- pathSets
      path <- pathSet
      if path.endState contains target
    } yield path

Code on git.

Reference:
[1] Coursera.

Scala note 6: Bloxorz

Notes: val, lazy val and def:

def expr = {
    val x = { print("x"); 1}
    lazy val y = { print("y"); 2}
    def z = { print ("z"); 3}
    z + y + x + z + y + x
}
expr

The above program will print "xzyz" while evaluating expr. That is because def evaluates the variable every time it is called,  lazy val evaluates the variable when it is called and preserve the value, val evaluates the variable when it is defined and the value is also preserved. Thus, when expr is evaluated, x is first defined and evaluated, y and z are defined but not evaluated. then z and y are called and evaluated, but x is only called. Then z is evaluated again, follwed by calling y and x. Every time a variable is evaluated, it is printed, thus we get the result "xzyz".


Lift: defined in PartialFunction, which returns a function that takes some argument x to Some(this(x)) if this is defined for x and None otherwise. Applications in sequences:


scala> val list = List("Shirley", "Dora", "Jeimee")
list: List[String] = List(Shirley, Dora, Jeimee)

scala> list.lift(0)
res1: Option[String] = Some(Shirley)

scala> list.lift(3)
res2: Option[String] = None

scala> list.lift(0).map ("I love " + _) getOrElse "you"
res3: String = I love Shirley

scala> list.lift(3).map ("I love " + _) getOrElse "you"
res4: String = you





This week's problem focuses on the game Bloxorz

Bloxorz: InstructionsHelp Center

Attention: You are allowed to submit a maximum of 5 times! for grade purposes. Once you have submitted your solution, you should see your grade and a feedback about your code on the Coursera website within 10 minutes. If you want to improve your grade, just submit an improved solution. The best of all your first 5 submissions will count as the final grade. You can still submit after the 5th time to get feedbacks on your improved solutions, however, these are for research purposes only, and will not be counted towards your final grade.
Download the streams.zip handout archive file and extract it somewhere on your machine.
In this assignment you will implement a solver for a simplified version of a Flash game named “Bloxorz” using streams and lazy evaluation.
As in the previous assignments, you are encouraged to look at the Scala API documentation while solving this exercise, which can be found here:

Bloxorz

bloxorz
Bloxorz is a game in Flash, which you can access here. As a first step for this assignment, play it for a few levels.
The objective of Bloxorz is simple; you must navigate your rectangular block to the hole at the end of the board, by rolling it, in the fewest number of moves possible. A block can be moved in 4 possible directions, left, right, up, down, using the appropriate keys on the keyboard.
You will quickly notice that for many levels, you are, in your head, trying to walk through different configurations/positions of where the block can be in order to reach it to the goal position. Equipped with some new programming skills, you can now let your computer do the work!
The idea of this assignment is to code a solver for a simplified version of this game, with no orange tiles, circles or crosses on the terrain. The goal of your program, given a terrain configuration with a start position and a goal position, is to return the exact sequence of keys to type in order to reach the goal position. Naturally, we will be interested in getting the shortest path as well.

State-space Exploration

The theory behind coding a solver for this game is in fact be applicable to many different problems. The general problem we are trying to solve is the following:
  • We start at some initial state S, and we are trying to reach an end state T.
  • From every state, there are possible transitions to other states, some of which are out of bounds.
  • We explore the states, starting from S. by exploring its neighbors and following the chain, until we reach T. There are different ways of exploring the state space. On the two ends of the spectrum are the following techniques:
    • depth-first search: when we see a new state, we immediately explore its direct neighbors, and we do this all the way down, until we reach a roadblock. Then we backtrack until the first non-explored neighbor, and continue in the same vein.
    • breadth-first search: here, we proceed more cautiously. When we find the neighbors of our current state, we explore each of them for each step. The respective neighbors of these states are then stored to be explored at a later time.

Game Setup

Let us start by setting up our platform. The trait GameDef will contain all the logic regarding how the terrain is setup, the blocks are represented and how they move.

Positions

A position on the game board is represented using the case class Pos(x:Int, y:Int), where x and y represent its coordinates. The scaladoc comment on class Pos explains how to interpret the coordinates:
  • The x coordinate denotes the position on the vertical axis
  • The y coordinate is used for the horizontal axis
  • The coordinates increase when moving down and right
Illustration:
  0 1 2 3   <- y axis
0 o o o o
1 o o o o
2 o # o o    # is at position Pos(2, 1)
3 o o o o

^
|

x axis

The Terrain

We represent our terrain as a function from positions to booleans:
type Terrain = Pos => Boolean
The function returns true for every position that is inside the terrain. Terrains can be created easily from a string representation using the methods in the file StringParserTerrain.scala.
Your first task is to implement two methods in trait StringParserTerrain that are used to parse the terrain and the start / end positions. The Scaladoc comments give precies instructions how they should be implemented.
def terrainFunction(levelVector: Vector[Vector[Char]]): Pos => Boolean = ???
def findChar(c: Char, levelVector: Vector[Vector[Char]]): Pos = ???


def terrainFunction(levelVector: Vector[Vector[Char]]): Pos => Boolean = 
    (pos: Pos) => (!levelVector.lift(pos.x).isEmpty) && (!levelVector(0).lift(pos.y).isEmpty)&& (levelVector(pos.x)(pos.y) != '-')
def findChar(c: Char, levelVector: Vector[Vector[Char]]): Pos = {
      val row = levelVector.indexWhere(_.contains(c))
      val col = levelVector(row).indexOf(c)
      Pos(row, col)
    }

Blocks

Back in the file GameDef.scala, a block is a 2 x 1 x 1 cuboid. We represent it as a case class which contains two fields, the 2d position of both the cubes which make up the block.
Block is therefore a case class Block(b1: Pos, b2: Pos), and can move in four different directions, each time yielding a new block. To this effect, the methods leftrightup and down are provided.
Given this, you can now define a method isStanding which tells us whether the Block is standing or not:
def isStanding: Boolean = ???
Next, implement a method isLegal on Block which tells us whether a block is on the terrain or off it:
def isLegal: Boolean = ???
  def isStanding: Boolean = b1 == b2

    /**
     * Returns `true` if the block is entirely inside the terrain.
     */
    def isLegal: Boolean = terrain(b1) && terrain(b2)

Finally, we need to implement a method that constructs the initial block for our simulation, the block located at the start position:
def startBlock: Block = ???

def startBlock: Block = Block(startPos, startPos)

Moves and Neighbors

To record which moves we make when navigating the block, we represent the four possible moves as case objects:
sealed abstract class Move
case object Left  extends Move
case object Right extends Move
case object Up    extends Move
case object Down  extends Move
You can now implement the functions neighbors and legalNeighbors on Block, which return a list of tuples: the neighboring blocks, as well as the move to get there.
def neighbors: List[(Block,Move)] = ???
def legalNeighbors: List[(Block,Move)] = ???

def neighbors: List[(Block, Move)] = 
      List((left, Left), (right, Right), (up, Up), (down, Down))

    /**
     * Returns the list of positions reachable from the current block
     * which are inside the terrain.
     */
    def legalNeighbors: List[(Block, Move)] =
      neighbors filter(_._1.isLegal)

Solving the Game

Now that everything is set up, we can concentrate on actually coding our solver which is defined in the file Solver.scala.
We could represent a path to a solution as a Stream[Block]. We however also need to make sure we keep the history on our way to the solution. Therefore, a path is represented as a Stream[(Block, List[Move])], where the second part of the pair records the history of moves so far. Unless otherwise noted, the last move is the head element of the List[Move].
First, implement a function done which determines when we have reached the goal:
def done(b: Block): Boolean = ???

def done(b: Block): Boolean =  b.b1 == goal && b.b2 == goal

Finding Neighbors

Then, implement a function neighborsWithHistory, which, given a block, and its history, returns a stream of neighboring blocks with the corresponding moves.
def neighborsWithHistory(b: Block, history: List[Move]): Stream[(Block, List[Move])] = ???
As mentioned above, the history is ordered so that the most recent move is the head of the list. If you consider Level 1 as defined in Bloxorz.scala, then
neighborsWithHistory(Block(Pos(1,1),Pos(1,1)), List(Left,Up))
results in a stream with the following elements (given as a set):
Set(
  (Block(Pos(1,2),Pos(1,3)), List(Right,Left,Up)),
  (Block(Pos(2,1),Pos(3,1)), List(Down,Left,Up))
)
You should implement the above example as a test case in the test suite BloxorzSuite.
def neighborsWithHistory(b: Block, history: List[Move]): Stream[(Block, List[Move])] = 
    b.legalNeighbors.map {pair=> (pair._1, pair._2 +: history)}.toStream

Avoiding Circles

While exploring a path, we will also track all the blocks we have seen so far, so as to not get lost in circles of movements (such as sequences of left-right-left-right). Implement a function newNeighborsOnly to this effect:
def newNeighborsOnly(neighbors: Stream[(Block, List[Move])],
                     explored: Set[Block]): Stream[(Block, List[Move])] = ???
Example usage:
newNeighborsOnly(
  Set(
    (Block(Pos(1,2),Pos(1,3)), List(Right,Left,Up)),
    (Block(Pos(2,1),Pos(3,1)), List(Down,Left,Up))
  ).toStream,

  Set(Block(Pos(1,2),Pos(1,3)), Block(Pos(1,1),Pos(1,1)))
)
returns
  Set(
    (Block(Pos(2,1),Pos(3,1)), List(Down,Left,Up))
  ).toStream
Again, you should convert this example into a test case.
def newNeighborsOnly(neighbors: Stream[(Block, List[Move])],
                       explored: Set[Block]): Stream[(Block, List[Move])] = 
                         neighbors.filter(n => !explored.contains(n._1))

Finding Solutions

Now to the crux of the solver. Implement a function from, which, given an initial stream and a set of explored blocks, creates a stream containing the possible paths starting from the head of the initial stream:
def from(initial: Stream[(Block, List[Move])],
         explored: Set[Block]): Stream[(Block, List[Move])] = ???
Note: pay attention to how the path is constructed: as discussed in the introduction, the key to getting the shortest path for the problem is to explore the space in a breadth-first manner.
Hint: The case study lecture about the water pouring problem (7.5) might help you.
  def from(initial: Stream[(Block, List[Move])],
           explored: Set[Block]): Stream[(Block, List[Move])] = {
      if (initial.isEmpty) Stream.empty
      else {
        val more = for {
          pair <- initial
          next <- newNeighborsOnly (neighborsWithHistory(pair._1, pair._2), explored)
     } yield next
     initial #::: from(more, explored ++ (more.map(_._1)))
    }
   }

Putting Things together

Finally we can define a lazy val pathsFromStart which is a stream of all the paths that begin at the starting block:
lazy val pathsFromStart: Stream[(Block, List[Move])] = ???

lazy val pathsFromStart: Stream[(Block, List[Move])] = 
    from(Stream((startBlock, List[Move]())), Set[Block]())

We can also define pathToGoal which is a stream of all possible pairs of goal blocks along with their history. Indeed, there can be more than one road to Rome!
lazy val pathsToGoal: Stream[(Block, List[Move])] = ???

lazy val pathsToGoal: Stream[(Block, List[Move])] = pathsFromStart.filter(pair => done(pair._1))

To finish it off, we define solution to contain the (or one of the) shortest list(s) of moves that lead(s) to the goal.
Note: the head element of the returned List[Move] should represent the first move that the player should perform from the starting position.
lazy val solution: List[Move] = ???

lazy val solution: List[Move] = pathsToGoal.headOption match {
    case None => List.empty
    case Some(pair) => (pair._2).reverse
  }

**The problem is solved using BFS approach.


Code check on git.

References (and special thanks):
[1]. codatlas
[2]. Scala partial functions
[3]. My friend's homework

Saturday, October 10, 2015

Scala note 5: Sentence Anagrams

Finally I made it work! It takes much longer than I thought and I am too exhausted to explain everything. The original problem set can be found here.

My friend said overall the class is easy. Not to me. 


package forcomp

import common._

object Anagrams {

  /** A word is simply a `String`. */
  type Word = String

  /** A sentence is a `List` of words. */
  type Sentence = List[Word]

  /** `Occurrences` is a `List` of pairs of characters and positive integers saying
   *  how often the character appears.
   *  This list is sorted alphabetically w.r.t. to the character in each pair.
   *  All characters in the occurrence list are lowercase.
   *  
   *  Any list of pairs of lowercase characters and their frequency which is not sorted
   *  is **not** an occurrence list.
   *  
   *  Note: If the frequency of some character is zero, then that character should not be
   *  in the list.
   */
  type Occurrences = List[(Char, Int)]

  /** The dictionary is simply a sequence of words.
   *  It is predefined and obtained as a sequence using the utility method `loadDictionary`.
   */
  val dictionary: List[Word] = loadDictionary

  /** Converts the word into its character occurence list.
   *  
   *  Note: the uppercase and lowercase version of the character are treated as the
   *  same character, and are represented as a lowercase character in the occurrence list.
   */
  def wordOccurrences(w: Word): Occurrences = 
    w.toLowerCase.groupBy(c => c).mapValues(c => c.length).toList.sorted
    //w.toLowerCase.groupBy(c => c).toList.map(c => (c._1, c._2.length)).sorted

  /** Converts a sentence into its character occurrence list. */
  def sentenceOccurrences(s: Sentence): Occurrences = 
    wordOccurrences((s foldLeft "")(_+_))
    //s.flatten.mkString

  /** The `dictionaryByOccurrences` is a `Map` from different occurrences to a sequence of all
   *  the words that have that occurrence count.
   *  This map serves as an easy way to obtain all the anagrams of a word given its occurrence list.
   *  
   *  For example, the word "eat" has the following character occurrence list:
   *
   *     `List(('a', 1), ('e', 1), ('t', 1))`
   *
   *  Incidentally, so do the words "ate" and "tea".
   *
   *  This means that the `dictionaryByOccurrences` map will contain an entry:
   *
   *    List(('a', 1), ('e', 1), ('t', 1)) -> Seq("ate", "eat", "tea")
   *
   */
  lazy val dictionaryByOccurrences: Map[Occurrences, List[Word]] = 
    dictionary.groupBy(word => wordOccurrences(word))

  /** Returns all the anagrams of a given word. */
  def wordAnagrams(word: Word): List[Word] = 
    dictionaryByOccurrences(wordOccurrences(word))

  /** Returns the list of all subsets of the occurrence list.
   *  This includes the occurrence itself, i.e. `List(('k', 1), ('o', 1))`
   *  is a subset of `List(('k', 1), ('o', 1))`.
   *  It also include the empty subset `List()`.
   * 
   *  Example: the subsets of the occurrence list `List(('a', 2), ('b', 2))` are:
   *
   *    List(
   *      List(),
   *      List(('a', 1)),
   *      List(('a', 2)),
   *      List(('b', 1)),
   *      List(('a', 1), ('b', 1)),
   *      List(('a', 2), ('b', 1)),
   *      List(('b', 2)),
   *      List(('a', 1), ('b', 2)),
   *      List(('a', 2), ('b', 2))
   *    )
   *
   *  Note that the order of the occurrence list subsets does not matter -- the subsets
   *  in the example above could have been displayed in some other order.
   */

  def combinations(occurrences: Occurrences): List[Occurrences] = {
    val ocs: List[Occurrences] = (occurrences.map( occ => (1 to occ._2).map( o => (occ._1, o) ).toList ))
    ocs.foldLeft(List[Occurrences](Nil))( (l1, l2) =>
      l1 ::: (for (elem1 <- data-blogger-escaped---="" data-blogger-escaped--="" data-blogger-escaped-1="" data-blogger-escaped-3="" data-blogger-escaped-:::="" data-blogger-escaped-a="" data-blogger-escaped-an="" data-blogger-escaped-and="" data-blogger-escaped-any="" data-blogger-escaped-appear="" data-blogger-escaped-appearing="" data-blogger-escaped-be="" data-blogger-escaped-cannot="" data-blogger-escaped-character="" data-blogger-escaped-diff="" data-blogger-escaped-elem1="" data-blogger-escaped-elem2="" data-blogger-escaped-equal="" data-blogger-escaped-frequency="" data-blogger-escaped-from="" data-blogger-escaped-has="" data-blogger-escaped-in="" data-blogger-escaped-is="" data-blogger-escaped-it="" data-blogger-escaped-its="" data-blogger-escaped-l1="" data-blogger-escaped-l2="" data-blogger-escaped-list="" data-blogger-escaped-meaning="" data-blogger-escaped-must="" data-blogger-escaped-no="" data-blogger-escaped-note:="" data-blogger-escaped-occurrence="" data-blogger-escaped-of="" data-blogger-escaped-or="" data-blogger-escaped-precondition="" data-blogger-escaped-resulting="" data-blogger-escaped-smaller="" data-blogger-escaped-sorted="" data-blogger-escaped-subset="" data-blogger-escaped-subtract="" data-blogger-escaped-subtracts="" data-blogger-escaped-than="" data-blogger-escaped-that="" data-blogger-escaped-the="" data-blogger-escaped-use="" data-blogger-escaped-value="" data-blogger-escaped-x="" data-blogger-escaped-y="" data-blogger-escaped-yield="" data-blogger-escaped-zero-entries.=""> List(('a', 2))
  //x diff y => List(('a', 3))
 def subtract(x: Occurrences, y: Occurrences): Occurrences = 
    (x /: y)((newx, elemy) => 
      (for (elemx <- data-blogger-escaped--="" data-blogger-escaped-elemx._1="" data-blogger-escaped-elemx._2="" data-blogger-escaped-elemx="" data-blogger-escaped-elemy._1="" data-blogger-escaped-elemy._2="" data-blogger-escaped-else="" data-blogger-escaped-filter="" data-blogger-escaped-if="" data-blogger-escaped-newx="" data-blogger-escaped-yield=""> 0).sorted
 
  /** Returns a list of all anagram sentences of the given sentence.
   *  
   *  An anagram of a sentence is formed by taking the occurrences of all the characters of
   *  all the words in the sentence, and producing all possible combinations of words with those characters,
   *  such that the words have to be from the dictionary.
   *
   *  The number of words in the sentence and its anagrams does not have to correspond.
   *  For example, the sentence `List("I", "love", "you")` is an anagram of the sentence `List("You", "olive")`.
   *
   *  Also, two sentences with the same words but in a different order are considered two different anagrams.
   *  For example, sentences `List("You", "olive")` and `List("olive", "you")` are different anagrams of
   *  `List("I", "love", "you")`.
   *  
   *  Here is a full example of a sentence `List("Yes", "man")` and its anagrams for our dictionary:
   *
   *    List(
   *      List(en, as, my),
   *      List(en, my, as),
   *      List(man, yes),
   *      List(men, say),
   *      List(as, en, my),
   *      List(as, my, en),
   *      List(sane, my),
   *      List(Sean, my),
   *      List(my, en, as),
   *      List(my, as, en),
   *      List(my, sane),
   *      List(my, Sean),
   *      List(say, men),
   *      List(yes, man)
   *    )
   *
   *  The different sentences do not have to be output in the order shown above - any order is fine as long as
   *  all the anagrams are there. Every returned word has to exist in the dictionary.
   *  
   *  Note: in case that the words of the sentence are in the dictionary, then the sentence is the anagram of itself,
   *  so it has to be returned in this list.
   *
   *  Note: There is only one anagram of an empty sentence.
   */
  
  def sentenceAnagrams(sentence: Sentence): List[Sentence] = 
    sentenceAnagramsHelper(sentenceOccurrences(sentence))

   def sentenceAnagramsHelper(occurrences: Occurrences): List[Sentence] =  occurrences match {
    case Nil => List(Nil)
    case occurrences => {
      val combs = combinations(occurrences)
      //Set(elem) return a boolean value true if elem exists and false if doesn't
      for {i <- combs if dictionarybyoccurrences.keyset (i)
           j <- dictionarybyoccurrences (i) 
           s <- sentenceanagramshelper (subtract (occurrences, i)))
           } yield (j :: s)

Code is also on git.

References (and special thanks):
[1]. Codatlas;
[2]. chancila (gitHub).

Sunday, October 4, 2015

Scala note 4: Huffman Coding

This post is based on Coursera's Scala course homework for week 4 and week 5. The whole problem can be found here.

Some notes:

Case classes: they are regular classes which export their constructor parameters and which provide a recursive decomposition mechanism via pattern matching.
Pattern matching: allows to match on any sort of data with a first-match policy.

List.groupBy: Partitions this traversable collection into a map of traversable collections according to some function.

/: or folderLeft: applying a binary operator to a start value and all elements of this sequence, going left to right. z /: xs is the same as xs foldLeft z

Map.getOrElse(default value): Returns the value associated with a key, or a default value if the key is not contained in the map.

List.sortBy: sorts the sequence according to the Ordering which results from transforming an implicitly given Ordering with a transform function.

List.flatMap: Builds a new collection by applying a function to all elements of this list and flatten the result elements to a list.

List.Map: Builds a new collection by applying a function to all elements of this list.

See here for the difference between flatMap and Map.

Documentation on List can be found here.


object Huffman {

  /**
   * A huffman code is represented by a binary tree.
   *
   * Every `Leaf` node of the tree represents one character of the alphabet that the tree can encode.
   * The weight of a `Leaf` is the frequency of appearance of the character.
   *
   * The branches of the huffman tree, the `Fork` nodes, represent a set containing all the characters
   * present in the leaves below it. The weight of a `Fork` node is the sum of the weights of these
   * leaves.
   */
  abstract class CodeTree
  case class Fork(left: CodeTree, right: CodeTree, chars: List[Char], weight: Int) extends CodeTree
  case class Leaf(char: Char, weight: Int) extends CodeTree



  // Part 1: Basics

  def weight(tree: CodeTree): Int = tree match {
    case Fork(left, right, chars, w) =>  w
    case Leaf(char, weight) => weight
  }
  def chars(tree: CodeTree): List[Char] = tree match {
    case Fork(left, right, chars, weight) => chars
    case Leaf(char, weight) => List(char)
  }

  def makeCodeTree(left: CodeTree, right: CodeTree) =
    Fork(left, right, chars(left) ::: chars(right), weight(left) + weight(right))



  // Part 2: Generating Huffman trees

  /**
   * In this assignment, we are working with lists of characters. This function allows
   * you to easily create a character list from a given string.
   */
  def string2Chars(str: String): List[Char] = str.toList

  /**
   * This function computes for each unique character in the list `chars` the number of
   * times it occurs. For example, the invocation
   *
   *   times(List('a', 'b', 'a'))
   *
   * should return the following (the order of the resulting list is not important):
   *
   *   List(('a', 2), ('b', 1))
   *
   * The type `List[(Char, Int)]` denotes a list of pairs, where each pair consists of a
   * character and an integer. Pairs can be constructed easily using parentheses:
   *
   *   val pair: (Char, Int) = ('c', 1)
   *
   * In order to access the two elements of a pair, you can use the accessors `_1` and `_2`:
   *
   *   val theChar = pair._1
   *   val theInt  = pair._2
   *
   * Another way to deconstruct a pair is using pattern matching:
   *
   *   pair match {
   *     case (theChar, theInt) =>
   *       println("character is: "+ theChar)
   *       println("integer is  : "+ theInt)
   *   }
   */
    def times(chars: List[Char]): List[(Char, Int)] = {
    /*groupBy: Partitions the traversable collection into a map of traversable collections
     * according to some function
     * chars.groupBy(x => x) : return a map of (char, List(char, char....))
     */ 
    chars.groupBy(x => x).map(t => (t._1, t._2.length)).iterator.toList
    }
  /**
   * using map
   */
  def times2(chars: List[Char]): List[(Char, Int)] = {
    def iterate(map: Map[Char, Int], c: Char) = {
      val count = (map get c).getOrElse(0) + 1
      map + ((c, count))
    }
    // /: alternative of chars foldLeft Map[Char, Int]()
    (Map[Char, Int]() /: chars)(iterate).iterator.toList
  }
  


  /**
   * Returns a list of `Leaf` nodes for a given frequency table `freqs`.
   *
   * The returned list should be ordered by ascending weights (i.e. the
   * head of the list should have the smallest weight), where the weight
   * of a leaf is the frequency of the character.
   */
  def makeOrderedLeafList(freqs: List[(Char, Int)]): List[Leaf] = 
    freqs.sortBy(t => (t._2, t._1)).map(leaf => Leaf(leaf._1,leaf._2 ))
    

  /**
   * Checks whether the list `trees` contains only one single code tree.
   */
  def singleton(trees: List[CodeTree]): Boolean = 
    if(trees.length == 1) true else false

  /**
   * The parameter `trees` of this function is a list of code trees ordered
   * by ascending weights.
   *
   * This function takes the first two elements of the list `trees` and combines
   * them into a single `Fork` node. This node is then added back into the
   * remaining elements of `trees` at a position such that the ordering by weights
   * is preserved.
   *
   * If `trees` is a list of less than two elements, that list should be returned
   * unchanged.
   */
  def combine(trees: List[CodeTree]): List[CodeTree] = trees match {
    case left :: right :: rest => (makeCodeTree(left, right) :: rest)
      .sortBy(t => weight(t))
    case _ => trees
    }

  /**
   * This function will be called in the following way:
   *
   *   until(singleton, combine)(trees)
   *
   * where `trees` is of type `List[CodeTree]`, `singleton` and `combine` refer to
   * the two functions defined above.
   *
   * In such an invocation, `until` should call the two functions until the list of
   * code trees contains only one single tree, and then return that singleton list.
   *
   * Hint: before writing the implementation,
   *  - start by defining the parameter types such that the above example invocation
   *    is valid. The parameter types of `until` should match the argument types of
   *    the example invocation. Also define the return type of the `until` function.
   *  - try to find sensible parameter names for `xxx`, `yyy` and `zzz`.
   */
  def until(singleton: List[CodeTree] => Boolean, combine: List[CodeTree] => List[CodeTree])
  (trees: List[CodeTree]): List[CodeTree] = {
    if(singleton(trees)) trees
    else until(singleton, combine)(combine(trees))
  }

  /**
   * This function creates a code tree which is optimal to encode the text `chars`.
   *
   * The parameter `chars` is an arbitrary text. This function extracts the character
   * frequencies from that text and creates a code tree based on them.
   */
  def createCodeTree(chars: List[Char]): CodeTree = 
    until(singleton, combine)(makeOrderedLeafList(times(chars))).head
 



  // Part 3: Decoding

  type Bit = Int

  /**
   * This function decodes the bit sequence `bits` using the code tree `tree` and returns
   * the resulting list of characters.
   */
  def decode(tree: CodeTree, bits: List[Bit]): List[Char] = tree match {
    case Leaf (c, _) => if (bits.isEmpty) List(c) else  c :: decode(tree, bits)
    case Fork(left, right, _, _) => if (bits.head == 0) decode(left, bits.tail)
                                                     else decode(right, bits.tail)
  }

  /**
   * A Huffman coding tree for the French language.
   * Generated from the data given at
   *   http://fr.wikipedia.org/wiki/Fr%C3%A9quence_d%27apparition_des_lettres_en_fran%C3%A7ais
   */
  val frenchCode: CodeTree = Fork(Fork(Fork(Leaf('s',121895),Fork(Leaf('d',56269),Fork(Fork(Fork(Leaf('x',5928),Leaf('j',8351),List('x','j'),14279),Leaf('f',16351),List('x','j','f'),30630),Fork(Fork(Fork(Fork(Leaf('z',2093),Fork(Leaf('k',745),Leaf('w',1747),List('k','w'),2492),List('z','k','w'),4585),Leaf('y',4725),List('z','k','w','y'),9310),Leaf('h',11298),List('z','k','w','y','h'),20608),Leaf('q',20889),List('z','k','w','y','h','q'),41497),List('x','j','f','z','k','w','y','h','q'),72127),List('d','x','j','f','z','k','w','y','h','q'),128396),List('s','d','x','j','f','z','k','w','y','h','q'),250291),Fork(Fork(Leaf('o',82762),Leaf('l',83668),List('o','l'),166430),Fork(Fork(Leaf('m',45521),Leaf('p',46335),List('m','p'),91856),Leaf('u',96785),List('m','p','u'),188641),List('o','l','m','p','u'),355071),List('s','d','x','j','f','z','k','w','y','h','q','o','l','m','p','u'),605362),Fork(Fork(Fork(Leaf('r',100500),Fork(Leaf('c',50003),Fork(Leaf('v',24975),Fork(Leaf('g',13288),Leaf('b',13822),List('g','b'),27110),List('v','g','b'),52085),List('c','v','g','b'),102088),List('r','c','v','g','b'),202588),Fork(Leaf('n',108812),Leaf('t',111103),List('n','t'),219915),List('r','c','v','g','b','n','t'),422503),Fork(Leaf('e',225947),Fork(Leaf('i',115465),Leaf('a',117110),List('i','a'),232575),List('e','i','a'),458522),List('r','c','v','g','b','n','t','e','i','a'),881025),List('s','d','x','j','f','z','k','w','y','h','q','o','l','m','p','u','r','c','v','g','b','n','t','e','i','a'),1486387)

  /**
   * What does the secret message say? Can you decode it?
   * For the decoding use the `frenchCode' Huffman tree defined above.
   */
  val secret: List[Bit] = List(0,0,1,1,1,0,1,0,1,1,1,0,0,1,1,0,1,0,0,1,1,0,1,0,1,1,0,0,1,1,1,1,1,0,1,0,1,1,0,0,0,0,1,0,1,1,1,0,0,1,0,0,1,0,0,0,1,0,0,0,1,0,1)

  /**
   * Write a function that returns the decoded secret
   */
  def decodedSecret: List[Char] = 
    decode(frenchCode, secret)



  // Part 4a: Encoding using Huffman tree

  /**
   * This function encodes `text` using the code tree `tree`
   * into a sequence of bits.
   */
  def encode(tree: CodeTree)(text: List[Char]): List[Bit] = {
    def encodeChar(tree: CodeTree)(char: Char): List[Bit] = tree match {
      case Leaf(_, _) => List()
      case Fork(left, right, _, _) => if (chars(left).contains(text.head)) 0 :: encodeChar(left)(char)
                                                       else 1 :: encodeChar(right)(char)
    }
    text flatMap(encodeChar(tree))
  }


  // Part 4b: Encoding using code table

  type CodeTable = List[(Char, List[Bit])]

  /**
   * This function returns the bit sequence that represents the character `char` in
   * the code table `table`.
   */
  def codeBits(table: CodeTable)(char: Char): List[Bit] = 
    table(table.indexWhere(x => x._1 == char))._2

  /**
   * Given a code tree, create a code table which contains, for every character in the
   * code tree, the sequence of bits representing that character.
   *
   * Hint: think of a recursive solution: every sub-tree of the code tree `tree` is itself
   * a valid code tree that can be represented as a code table. Using the code tables of the
   * sub-trees, think of how to build the code table for the entire tree.
   */
  def convert(tree: CodeTree): CodeTable = tree match {
    case Leaf(char, _) => List((char, List()))
    case Fork(left, right, chars, _) => mergeCodeTables(convert(left), convert(right))
  }

  /**
   * This function takes two code tables and merges them into one. Depending on how you
   * use it in the `convert` method above, this merge method might also do some transformations
   * on the two parameter code tables.
   */
  def mergeCodeTables(a: CodeTable, b: CodeTable): CodeTable = 
     a.map(code => (code._1, 0:: code._2)) ::: b.map(code => (code._1, 1 :: code._2))

  /**
   * This function encodes `text` according to the code tree `tree`.
   *
   * To speed up the encoding process, it first converts the code tree to a code table
   * and then uses it to perform the actual encoding.
   */
  def quickEncode(tree: CodeTree)(text: List[Char]): List[Bit] = 
    text flatMap (codeBits(convert(tree)))
}


Check code on git: Hoffman.scala.

People help the people 

The video has nothing to do with this post. It's my all time love. I am listening to it when I write this blog. "If I had a brain, I'd be cold as a stone and rich as a fool, that turned all those good hearts away."