Introduction To Marketing Technology Course

Pig Scripts and Oozie Workflow

Don’t teach a pig to sing. It wastes your time and annoys the pig – Robert Heinlein

Pig Scripts query big data and generate output files.

Once.

If you want them to do the same thing everyday, you need to schedule them.

That is where Oozie comes in. Oozie is a pig scheduler.

Now, obviously, you want pig script to process new data. And generate a new output.

Therefore, the location of input and output for the pig script needs to be a variable that depends on time.

In the following example, we will examine a simple pig script that runs hourly. It tries to parse the beacon URL

benzene = LOAD '$BENZENE_DB_AND_TABLE' USING org.apache.hive.hcatalog.pig.HCatLoader();
spaceid_filter = FILTER benzene BY dt == '$DATE' AND spaceid == '1197805094';
spaceid_data = FOREACH spaceid_filter GENERATE page_uri AS page_uri;
spaceid_fields = FOREACH spaceid_data GENERATE FLATTEN (REGEX_EXTRACT_ALL(page_uri, '^.*ptyid=(.+?)&event=(.+?)&bid=(.+?)&ts=(.+?)&batch=(.+?)&logtime=(.+?)$'));
data = DISTINCT spaceid_fields PARALLEL 100;
STORE data INTO '$OUTPUT' USING PigStorage('\u0001');

As expected, we see that there are two variables – $DATE and $OUTPUT

These variables are specified in a workflow. Workflow is one unit of the job. If your script runs daily, you will have one workflow per day, 30 workflows per month, etc.

Here is the workflow:

<?xml version="1.0"?>
<workflow-app xmlns="uri:oozie:workflow:0.5" xmlns:sla="uri:oozie:sla:0.2" name="Benzene_Appsflyer_SpaceId_Hourly">
<credentials>
  <credential name="hcatauth" type="hcat">
    <property>
      <name>hcat.metastore.uri</name>
      <value>${benzene_hcat_uri}</value>
    </property>
    <property>
      <name>hcat.metastore.principal</name>
      <value>${benzene_hcat_principal}</value>
    </property>
  </credential>
</credentials>
<start to="benzene"/>
<action name="benzene" cred="hcatauth">
  <pig>
    <job-tracker>${job_tracker}</job-tracker>
    <name-node>${name_node}</name-node>
    <prepare>
      <delete path="${output_dir}"/>
    </prepare>
    <job-xml>job.xml</job-xml>
    <configuration>
      <property>
        <name>mapred.job.queue.name</name>
        <value>${queue_name}</value>
      </property>
      <property>
        <name>hive.metastore.client.socket.timeout</name>
        <value>200</value>
      </property>
      <property>
        <name>oozie.action.sharelib.for.pig</name>
        <value>pig_current,hcat_current</value>
      </property>
    </configuration>
    /${src_path}/benzeneappsflyerspaceid.pig
    <param>BENZENE_DB_AND_TABLE=${benzene_db_and_table}</param>
    <param>DATE=${datehour}</param>
    <param>edv=${epoch_date_value}</param>
    <param>OUTPUT=${output_dir}</param>
    <file>hive-site.xml</file>
  </pig>
  <ok to="change_file_permission"/>
  <error to="fail"/>
</action>
<action name="change_file_permission">
  <fs>
    <chmod path='${output_dir}' permissions='750' dir-files='true'/>
  </fs>
  <ok to="end"/>
  <error to="fail"/>
</action>
<kill name="fail">
 <message>Failed Oozie pipeline.</message>
</kill>
<end name='end' />
</workflow-app>

The key lines have been highlighted.

The first highlighted line is the “start” line. It specifies which action will be executed first.

The workflow is a specification of a series of actions. In this workflow, we have four actions –

  1. benzene
  2. change_file_permission
  3. end
  4. fail

The action we are interested in the most is called ‘benzene’. It executes the pig script, by calling it with params. We note that ‘datehour’ and ‘output_dir’ are the variables being referenced in this action.

This workflow is scheduled by the coordinator. The coordinator first creates an instance of the workflow, and then schedules that. Let us take a look at the coordinator:

<?xml version="1.0"?>
<coordinator-app xmlns="uri:oozie:coordinator:0.4" frequency="60" start="${start_time}" end="${end_time}" timezone="UTC" name="Benzene_Appsflyer_SpaceId_Hourly">
<controls>
  <timeout>-1</timeout>
  <concurrency>1</concurrency>
  <execution>FIFO</execution>
</controls>
<datasets>
  <dataset name="benzene_hcat" frequency="60" initial-instance="${initial_instance}" timezone="UTC">
      <uri-template>${benzene_hcat_uri}/${benzene_db}/${benzene_table}/dt=${YEAR}${MONTH}${DAY}${HOUR}</uri-template>
  </dataset>
  <dataset name="out_dir" frequency="60" initial-instance="${initial_instance}" timezone="UTC">
      <uri-template>${out_dir}/${YEAR}${MONTH}${DAY}${HOUR}/</uri-template>
  </dataset>
</datasets>
<input-events>
  <data-in name="BENZENE_HCAT" dataset="benzene_hcat">
    <instance>${coord:current(0)}</instance>
  </data-in>
</input-events>
<output-events>
  <data-out name="OUT_DIR" dataset="out_dir">
    <instance>${coord:current(0)}</instance>
  </data-out>
</output-events>
<action>
  <workflow>
    <app-path>/${src_path}/workflow.xml</app-path>
    <configuration>
      <property>
        <name>output_dir</name>
        <value>${coord:dataOut('OUT_DIR')}</value>
      </property>
      <property>
        <name>datehour</name>
        <value>${coord:formatTime(coord:nominalTime(), 'yyyyMMddHH')}</value>
      </property>
    </configuration>
  </workflow>
</action>
</coordinator-app>

As can be seen by focusing on the bolded items, the output_dir and datehour are set by the coordinator. The datehour is derived programmatically. The output_dir, though, depends on one more variable – OUT_DIR.

We trace OUT_DIR to earlier code in the coordinator – in the dataset definition – where it is looking for ‘out_dir’.

When the coordinator is submitted to oozie, the coordinator can accept a few configuration parameters, specified in a properties file.  Here is an example.

name_node=hdfs://elementred-nn1.red.ygrid.yahoo.com:8020
job_tracker=elementred-jt1.red.ygrid.yahoo.com:8032
queue_name=default

src_path=user/xyz/Benzene_Appsflyer_SpaceId_Hourly
out_path=/projects/xyz/benzene/spaceid/hourly 

out_dir=${name_node}/${out_path}

# Oozie
oozie.libpath=${name_node}/${src_path}/lib
oozie.coord.application.path=${name_node}/${src_path}
oozie_url=https://elementred-oozie.red.ygrid.yahoo.com:4443/oozie

benzene_db=benzene
benzene_table=hourly_data
benzene_db_and_table=${benzene_db}.${benzene_table}

initial_instance=2017-01-01T22:00Z
start_time=2018-02-21T02:00Z
end_time=2022-11-17T00:00Z

The out_dir has been bolded.

PUTTING IT ALL TOGETHER

  1. We need to upload the pig script, the workflow and the coordinator to HDFS.
  2. Then, we submit the coordinator to oozie by submitting the properties file.
  3. The variables defined in the properties file are used by the coordinator file to create workflow and schedule them.
  4. The workflow itself consists of actions – one of which could be the pig script. The workflow inherits the variables from coordinator and passes them to the pig script as command line arguments.
  5. The pig script is invoked, it runs, and completes successfully. It stores the data in the output location.
  6. Pig script’s completion is typically one of a series of actions in the workflow. Once the pig script completes, the next action is taken by the workflow.
  7. After the final action by the workflow, typically sending a success email, the workflow ends.
  8. The coordinator keeps scheduling subsequent workflows based on the frequency and end time specified in the properties file.

 

 

 

Leave a comment