Introduction To Marketing Technology Course

Real Time Signaling – Storm Topologies

Philosopher: Which came first – the chicken or the egg?

Storm Developer: It depends on the topology!

WHEN

If you are an advertiser, there are two common situations when you will absolutely need real time event processing:

  1. Transactional Emails / Trigger Based Emails: The user performs an action in one part of your product eco-system – and you want to send an email to the user immediately!
  2. Mobile App Marketing – without SDK: If you are a company paranoid of integrating 3rd party SDKs, yet, you want to spend advertising dollars to acquire new users or to introduce new features to your existing users, then, it is critical to send a feedback to the advertising marketplace, so that you get more traffic for the money spent. More than 200% traffic for the money spent!

HOW

You have many options when it comes to implementing Real Time Signaling. Apache Storm is a popular solution that I have used for both use cases.

Apache Storm is an infrastructure that runs on multiple nodes. Think of Storm as the orchestra that can play any music. Topology is the song. Or the actual business logic that you want to run on Storm.

What kind of business logic would you normally want to run on Storm? 99% of the time, it is as simple as:

  1. Read the real time input data from the source – (This is done by Storm Spout)
  2. Filter out unwanted records – (This is done by Storm Bolt)
  3. Extract desired columns -(This is done by Storm Bolt)
  4. Send those columns to an external endpoint over HTTP – (This is done by Storm Bolt)

As we can see, usually, we read once, and then perform a number of small operations. The reading is a special step, and performed by the Storm component called Spout. Individual actions are numerous and performed by the Storm component called Bolt.

In this example, we had one Spout and three Bolts.

When we write a Storm Program, we are really writing the spouts, the bolts and how they are connected to each other.

How they are connected to each other – that specification – is called topology.

Thus, we have a lot of jargon, to describe something very simple. The reason for the jargon is that Storm is capable of very complex behavior. Yet, the reality is, for marketing purposes, we only need the simplest features.

Extending the analogy of Storm as an Orchestra and Topology as song, we see that Topology is actually multiple sheets of music. There is a spout sheet. There are multiple sheets for each bolt. The sheets themselves are ordered in a stack – that ordering is important as well.

So, when we submit a topology to a Storm cluster, we are really submitting multiple tiny individual programs together. We could possibly do that in a zip file. Since these tiny programs are written in Java, there is a natural zip file format for Java called Jar.

STORM TOPOLOGY QUICKSTART GUIDE

We are primarily interested in the business logic portion of a storm topology. As always, we will assume that someone else has already figured out how to set up java development environment, create a basic topology and submit it to a storm cluster. Some external links that you can reference if you are setting up everything on your own:

I will paste some code below, and highlight the portions that are business logic specific.

First is the “topology_specification” file. It also specifies the spout. 3 relevant lines are highlighted.

Second is the bolt. 5 relevant lines are highlighted.

We will assume that we are able to build the jar file by executing the maven command.

Finally, we will see instructions on submitting the topology jar file to Storm.

Topology Specification File And Spout Specification

package com.company.example;
import java.util.Arrays;
import com.yahoo.adsdata.fetl.kq.storm.KQSpoutFactory;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;

public class TopicVerifierTopology {
    private static class KQTopology {
        protected TopologyBuilder builder;
        protected ExampleConf configuration;
        public KQTopology(final ExampleConf conf) {
            configuration = conf;
            builder = new TopologyBuilder();
            builder.setSpout("kqSpout", KQSpoutFactory.makeKQSpout(conf), configuration.getInt(ExampleConf.TOPIC_VERIFIER_SPOUTS))
            .setCPULoad(configuration.getInt(ExampleConf.TOPIC_VERIFIER_KQ_SPOUT_CPU))
            .setMemoryLoad(configuration.getInt(ExampleConf.TOPIC_VERIFIER_KQ_SPOUT_MEMORY));
        }

        public StormTopology createTopology() {
            return builder.createTopology();
        }
    }

    private static class AllTopology extends KQTopology {
        public AllTopology(finalExampleConf conf) {
            super(conf);
            builder.setBolt("TopicVerifierBolt", new TopicVerifierBolt(conf), configuration.getInt(ExampleConf.TOPIC_VERIFIER_BOLTS))
            .setCPULoad(configuration.getInt(ExampleConf.TOPIC_VERIFIER_BOLT_CPU))
            .setMemoryLoad(configuration.getInt(ExampleConf.TOPIC_VERIFIER_BOLT_MEMORY)).shuffleGrouping("kqSpout");
        }
    }
}
==========

Bolt Specification File


public class TopicVerifierBolt extends BaseExampleBolt  {
    public static Logger logger = Logger.getLogger(TopicVerifierBolt.class);
    public TopicVerifierBolt(SputnikConf config) {
        super(config);
    }

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        super.prepare(conf, context, collector);
    }

    @Override
    public void execute(Tuple kqTuple) {
        KQRecord pr = getKQRecord(kqTuple);
        String event = (String) pr.get("event");
        String ptyId = (String) pr.get("ptyid");
        String mobile_app_screen_name = (String) pr.get("mobile_app_screen_name");
        String page_uri = (String) pr.get("page_uri");
        logger.info(":ptyId = " + ptyId + ":\t  mobile_app_screen_name= " + mobile_app_screen_name + ":\tevent = " + event);
        outputCollector.ack(kqTuple);
    }
}
==========

Instructions for submitting topology to Storm

/usr/bin/storm jar ${JAR_PATH} -c topology.worker.childopts="

CONCLUSION

In this post we looked at Storm Topologies at a conceptual level. If the chicken and egg were both bolts, then which came first would depend on the topology 🙂 When reading specifics on how Storm works on apache storm site, this article gives context on when to use it in marketing. Finally, we have seen a real topology in action to demonstrate that we need to tinker with less than 10 lines of code for changing the business logic.

Leave a comment