Thursday, December 24, 2015

Spark Streaming Note: How to parse configuration correctly

I was testing Spark Streaming when I encountered the following problem:

Considering the following example:


public class StreamingWorking {

 public static void main(String[] args) {
  String topic = "test";
  int numThreads = 1;
  String checkpointDirectory = "/home/temp";
  String zookeeper = "localhost:2181";
  String consumerGroupId = "streaming_test";
  SparkConf sparkConf = new SparkConf().setAppName("StreamingTest");
  sparkConf.setMaster("spark://localhost:7077");
  sparkConf.setSparkHome("/home/spark");
  String[] jars = {"/home/spark/spark_try.jar"};
  sparkConf.setJars(jars);
  JavaStreamingContext context = new JavaStreamingContext(sparkConf, Durations.seconds(2));
  context.checkpoint(checkpointDirectory);
  sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true");
  Map<String, Integer> topics = new HashMap<String, Integer>();
  topics.put(topic, numThreads);
  JavaPairInputDStream messages = KafkaUtils.createStream(context, zookeeper, 
    consumerGroupId, topics, StorageLevel.MEMORY_AND_DISK_SER());
  JavaDstream lines = messages.map(new Function, String>() {
   public String call(Tuple2 tuple2) {
    return tuple2._2();
   }
  });
  lines.print();
  context.start();
  context.awaitTermination();
 }
}


It works perfectly fine if you compile and run.
However, if we want to process the incoming message based on some input configurations:

public class StreamingNotWorking {
 private static Config config1;
 private static Properties configFile;

 public StreamingNotWorking(Config config1, Properties configFile) {
  Streaming.config1 = config1;
  Streaming.configFile = configFile;
 }
 public void runStreaming() {
  String topic = configFile.getProperty("topic");
  int numThreads = Integer.parseInt(configFile.getProperty("topic"));
  String checkpointDirectory = configFile.getProperty("checkpointDirectory");
  String zookeeper = configFile.getProperty("zookeeper");
  String consumerGroupId = configFile.getProperty("consumerGroupId");
  SparkConf sparkConf = new SparkConf().setAppName("StreamingTest");
  sparkConf.setMaster(configFile.getProperty("sparkMaster"));
  sparkConf.setSparkHome(configFile.getProperty("sparkHome"));
  String[] jars = {"/home/spark/spark_try.jar"};
  sparkConf.setJars(jars);
  JavaStreamingContext context = new JavaStreamingContext(sparkConf, Durations.seconds(2));
  context.checkpoint(checkpointDirectory);
  sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true");
  Map topics = new HashMap();
  topics.put(topic, numThreads);
  JavaPairInputDStream messages = KafkaUtils.createStream(context, zookeeper, 
    consumerGroupId, topics, StorageLevel.MEMORY_AND_DISK_SER());
  JavaDstream lines = messages.map(new Function, String>() {
   public String call(Tuple2 tuple2) {
    return tuple2._2();
   }
  });
  JavaDstream processedMessage = lines.map(new Function() {
   public ProcessingMessage call(String lines) {
    return  new ProcessingMessage(lines, config1);
   }
  });
  processedMessage.print();
  context.start();
  context.awaitTermination();
 }
 
 private class ProcessingMessage {
  private static Config config1;
  public ProcessingMessage(String lines, Config config1) {
       ProcessingMessage.config1 = config1;
  }

}

public class StreamingMain {
 public static void main(String[] args) {
  Properties configFile = new Properties();
  configFile.load(StreamingMain.class.getClassLoader().getResourceAsStream(args[0]));
  Config config = new Config(configFile.getProperty("config"));
  try {
   new StreamingNotWorking(config, configFile).runStreaming();
  } catch (Exception e) {
   System.err.println(e);
  }
 }
}

The above code will throw NullPointerException on the Config class. I spent lots of fruitless days trying to figure out where the problem is. Then my colleague enlightened me:


public class StreamingWorkingAgain {

 public void runStreaming(String configFileString) {
  Properties configFile = new Properties();
  configFile.load(this.getClass().getClassLoader().getResourceAsStream(configFileString));
  String topic = configFile.getProperty("topic");
  int numThreads = Integer.parseInt(configFile.getProperty("topic"));
  String checkpointDirectory = configFile.getProperty("checkpointDirectory");
  String zookeeper = configFile.getProperty("zookeeper");
  String consumerGroupId = configFile.getProperty("consumerGroupId");
  SparkConf sparkConf = new SparkConf().setAppName("StreamingTest");
  sparkConf.setMaster(configFile.getProperty("sparkMaster"));
  sparkConf.setSparkHome(configFile.getProperty("sparkHome"));
  String[] jars = {"/home/spark/spark_try.jar"};
  sparkConf.setJars(jars);
  JavaStreamingContext context = new JavaStreamingContext(sparkConf, Durations.seconds(2));
  context.checkpoint(checkpointDirectory);
  sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true");
  Map topics = new HashMap();
  topics.put(topic, numThreads);
  JavaPairInputDStream messages = KafkaUtils.createStream(context, zookeeper, 
    consumerGroupId, topics, StorageLevel.MEMORY_AND_DISK_SER());
  JavaDstream lines = messages.map(new Function, String>() {
   public String call(Tuple2 tuple2) {
    return tuple2._2();
   }
  });
  JavaDstream processedMessage = lines.map(new Function() {
   public ProcessingMessage call(String lines) {
    return  new ProcessingMessage(lines, configFileString);
   }
  });
  lines.print();
  context.start();
  context.awaitTermination();
 }
 
 private class ProcessingMessage {
                private static Config config1;
  public ProcessingMessage(String lines, String configFileString) {
   Properties config = new Properties();
   try {
    config.load(this.getClass().getClassLoader().getResourceAsStream(configFileString));
   } catch (IOException e) {
    e.printStackTrace();
   }
   ProcessingMessage.config1 = new Config(config.getProperty("config1"));
  }
 }

}

public class StreamingMain {
 public static void main(String[] args) {
  String configFileString = args[0];
  try {
   new StreamingWorkingAgain().runStreaming(configFileString);
  } catch (Exception e) {
   System.err.println(e);
  }
 }
}

The only difference is that: instead of parsing the Properties object ("configFile") and Config object ("config1"), a String "configFileString" is parsed into ProcessingMessage class to create JavaDStream object. So why is it working?


In Spark, the SparkContext is created in the driver program (see the figure below), which is the following part:

SparkConf sparkConf = new SparkConf().setAppName("StreamingTest");
  sparkConf.setMaster(configFile.getProperty("sparkMaster"));
  sparkConf.setSparkHome(configFile.getProperty("sparkHome"));
  String[] jars = {"/home/spark/spark_try.jar"};
  sparkConf.setJars(jars);
  JavaStreamingContext context = new JavaStreamingContext(sparkConf, Durations.seconds(2));
  context.checkpoint(checkpointDirectory);
  sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true");


Source: http://spark.apache.org/docs/latest/cluster-overview.html


After that, all the Spark configurations and other processing methods will be serialized and sent to worker nodes. All message processing will be done on worker nodes. If the Config is a static object in driver program, and is parsed directly to a worker node, it will not be reinitialized on the worker node, and we will see NullPointerException.

Instead, we can parse the configFile string to the worker node, and initialize the Config on the worker node. Since Config is static in ProcessingMessage class, it will only be initialized once.



* The code is just an example, it may not be compiled.

No comments:

Post a Comment