AdSense

Thursday, December 24, 2015

Spark Streaming Note: How to parse configuration correctly

I was testing Spark Streaming when I encountered the following problem:

Considering the following example:


public class StreamingWorking {

 public static void main(String[] args) {
  String topic = "test";
  int numThreads = 1;
  String checkpointDirectory = "/home/temp";
  String zookeeper = "localhost:2181";
  String consumerGroupId = "streaming_test";
  SparkConf sparkConf = new SparkConf().setAppName("StreamingTest");
  sparkConf.setMaster("spark://localhost:7077");
  sparkConf.setSparkHome("/home/spark");
  String[] jars = {"/home/spark/spark_try.jar"};
  sparkConf.setJars(jars);
  JavaStreamingContext context = new JavaStreamingContext(sparkConf, Durations.seconds(2));
  context.checkpoint(checkpointDirectory);
  sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true");
  Map<String, Integer> topics = new HashMap<String, Integer>();
  topics.put(topic, numThreads);
  JavaPairInputDStream messages = KafkaUtils.createStream(context, zookeeper, 
    consumerGroupId, topics, StorageLevel.MEMORY_AND_DISK_SER());
  JavaDstream lines = messages.map(new Function, String>() {
   public String call(Tuple2 tuple2) {
    return tuple2._2();
   }
  });
  lines.print();
  context.start();
  context.awaitTermination();
 }
}


It works perfectly fine if you compile and run.
However, if we want to process the incoming message based on some input configurations:

public class StreamingNotWorking {
 private static Config config1;
 private static Properties configFile;

 public StreamingNotWorking(Config config1, Properties configFile) {
  Streaming.config1 = config1;
  Streaming.configFile = configFile;
 }
 public void runStreaming() {
  String topic = configFile.getProperty("topic");
  int numThreads = Integer.parseInt(configFile.getProperty("topic"));
  String checkpointDirectory = configFile.getProperty("checkpointDirectory");
  String zookeeper = configFile.getProperty("zookeeper");
  String consumerGroupId = configFile.getProperty("consumerGroupId");
  SparkConf sparkConf = new SparkConf().setAppName("StreamingTest");
  sparkConf.setMaster(configFile.getProperty("sparkMaster"));
  sparkConf.setSparkHome(configFile.getProperty("sparkHome"));
  String[] jars = {"/home/spark/spark_try.jar"};
  sparkConf.setJars(jars);
  JavaStreamingContext context = new JavaStreamingContext(sparkConf, Durations.seconds(2));
  context.checkpoint(checkpointDirectory);
  sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true");
  Map topics = new HashMap();
  topics.put(topic, numThreads);
  JavaPairInputDStream messages = KafkaUtils.createStream(context, zookeeper, 
    consumerGroupId, topics, StorageLevel.MEMORY_AND_DISK_SER());
  JavaDstream lines = messages.map(new Function, String>() {
   public String call(Tuple2 tuple2) {
    return tuple2._2();
   }
  });
  JavaDstream processedMessage = lines.map(new Function() {
   public ProcessingMessage call(String lines) {
    return  new ProcessingMessage(lines, config1);
   }
  });
  processedMessage.print();
  context.start();
  context.awaitTermination();
 }
 
 private class ProcessingMessage {
  private static Config config1;
  public ProcessingMessage(String lines, Config config1) {
       ProcessingMessage.config1 = config1;
  }

}

public class StreamingMain {
 public static void main(String[] args) {
  Properties configFile = new Properties();
  configFile.load(StreamingMain.class.getClassLoader().getResourceAsStream(args[0]));
  Config config = new Config(configFile.getProperty("config"));
  try {
   new StreamingNotWorking(config, configFile).runStreaming();
  } catch (Exception e) {
   System.err.println(e);
  }
 }
}

The above code will throw NullPointerException on the Config class. I spent lots of fruitless days trying to figure out where the problem is. Then my colleague enlightened me:


public class StreamingWorkingAgain {

 public void runStreaming(String configFileString) {
  Properties configFile = new Properties();
  configFile.load(this.getClass().getClassLoader().getResourceAsStream(configFileString));
  String topic = configFile.getProperty("topic");
  int numThreads = Integer.parseInt(configFile.getProperty("topic"));
  String checkpointDirectory = configFile.getProperty("checkpointDirectory");
  String zookeeper = configFile.getProperty("zookeeper");
  String consumerGroupId = configFile.getProperty("consumerGroupId");
  SparkConf sparkConf = new SparkConf().setAppName("StreamingTest");
  sparkConf.setMaster(configFile.getProperty("sparkMaster"));
  sparkConf.setSparkHome(configFile.getProperty("sparkHome"));
  String[] jars = {"/home/spark/spark_try.jar"};
  sparkConf.setJars(jars);
  JavaStreamingContext context = new JavaStreamingContext(sparkConf, Durations.seconds(2));
  context.checkpoint(checkpointDirectory);
  sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true");
  Map topics = new HashMap();
  topics.put(topic, numThreads);
  JavaPairInputDStream messages = KafkaUtils.createStream(context, zookeeper, 
    consumerGroupId, topics, StorageLevel.MEMORY_AND_DISK_SER());
  JavaDstream lines = messages.map(new Function, String>() {
   public String call(Tuple2 tuple2) {
    return tuple2._2();
   }
  });
  JavaDstream processedMessage = lines.map(new Function() {
   public ProcessingMessage call(String lines) {
    return  new ProcessingMessage(lines, configFileString);
   }
  });
  lines.print();
  context.start();
  context.awaitTermination();
 }
 
 private class ProcessingMessage {
                private static Config config1;
  public ProcessingMessage(String lines, String configFileString) {
   Properties config = new Properties();
   try {
    config.load(this.getClass().getClassLoader().getResourceAsStream(configFileString));
   } catch (IOException e) {
    e.printStackTrace();
   }
   ProcessingMessage.config1 = new Config(config.getProperty("config1"));
  }
 }

}

public class StreamingMain {
 public static void main(String[] args) {
  String configFileString = args[0];
  try {
   new StreamingWorkingAgain().runStreaming(configFileString);
  } catch (Exception e) {
   System.err.println(e);
  }
 }
}

The only difference is that: instead of parsing the Properties object ("configFile") and Config object ("config1"), a String "configFileString" is parsed into ProcessingMessage class to create JavaDStream object. So why is it working?


In Spark, the SparkContext is created in the driver program (see the figure below), which is the following part:

SparkConf sparkConf = new SparkConf().setAppName("StreamingTest");
  sparkConf.setMaster(configFile.getProperty("sparkMaster"));
  sparkConf.setSparkHome(configFile.getProperty("sparkHome"));
  String[] jars = {"/home/spark/spark_try.jar"};
  sparkConf.setJars(jars);
  JavaStreamingContext context = new JavaStreamingContext(sparkConf, Durations.seconds(2));
  context.checkpoint(checkpointDirectory);
  sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true");


Source: http://spark.apache.org/docs/latest/cluster-overview.html


After that, all the Spark configurations and other processing methods will be serialized and sent to worker nodes. All message processing will be done on worker nodes. If the Config is a static object in driver program, and is parsed directly to a worker node, it will not be reinitialized on the worker node, and we will see NullPointerException.

Instead, we can parse the configFile string to the worker node, and initialize the Config on the worker node. Since Config is static in ProcessingMessage class, it will only be initialized once.



* The code is just an example, it may not be compiled.

Monday, December 21, 2015

The end, and a new start

Finally my thesis is accepted this morning. Now I am officially a Ph.D, which is useless to every part of my current life.

So ironic, and pathetic.  

The same time last year, the weather is similar, but much colder. I probably was on campus, just headed back from Mashall st. to get another cup of coffee, and started practicing Leetcode. I was struggling and scared at that time, and not knowing what would happen the next year. But I refused to give up.

Now I am sitting in my apartment in CA, enjoying my first day of the two-week vacation, and writing this blog. I am happy.

Half a year after I moved to Bay Area, the words I said most to people is "This is the best decision I have ever made in my life". I am glad to have the life I am having now, and I am grateful.

My job searching was not that smooth. I only got two onsite interviews. On the day I got rejected by the first company, I was devastated. I texted my friend: "this was a huge humiliation in my life and I will never let it happen again." He texted back:"This is just part of your life journey, and soon you will realize it's nothing".

He was right. Two days later I got an offer from my current company. The next couple months became the best time in my life after my high school. My colleagues are friendly, the projects are fun, I have met lots of new friends and, life is awesome.

The first month in my new company, all I was thinking was quitting it and finding a new job in a "better" company. My company is not one of those famous big companies in Silicon Valley. It is a game company and I don't even play games every often. It's a startup which only provides options but no actual stock (earning big once it goes IPO is another story). Those are the reasons I wanted to leave. But three months later, I started to think again. Except the above reasons, I have everything I want for a job: a reasonable boss, a great mentor, friendly colleagues, fun projects, lots of things I can learn, decent salary and good benefits. I ask myself, is it that important to get to one of those "famous" companies? I may not have so much fun in my job and the people around may not be as nice as I have now. I am not sure what decision I will make next year, but I can always have a try.

I always thought I can only get the second best thing, it's my destiny. Now I think nothing is in fact perfect. If I got into the first company, which was my dream company, I probably end up in a boring job. Everything is "not perfect, but good enough". All I have to do is to be grateful about what I have and try my best to get what I want.


I always tried to gasp whatever I want. I thought if I try harder, use another way, it may work. Now I think it might not be the case. Everything is about the "right" thing at the "right" moment. I was too stubborn. Now I always ask myself, how can I be so sure that it is the "right" thing? And if I cannot convince myself, it probably also is not the right time to pursue it.

Yet there is no time the perfect timing. And at some time I have to make decisions. It probably is true that I should always wait until I gather enough information. But when? My friend said, if you cannot make a decision, then wait until the last moment, and follow your heart, because that would always be what you want. I can't disagree, but I don't agree. Betting my life on randomness is definitely not who I am. I prefer doing more analysis and make rational decisions. However, life is indeed balanced. I choose something, and I always lose another. So if I make up my mind, then don't regret it.

2015 is a year of life transition. I experienced and learned too many things. It's the end of the pathetic-graduate-student era, but the start as a self-independent Shirley. 2016 would be another exciting year. I am not sure if I will successfully be granted the visa and stay in the States. I may be deprived I have right now. It's scary, but if that happens, I will (have to) figure out a way.

Merry Christmas and Happy holidays, to everyone.