Wednesday, March 23, 2016

Execute Oozie step within Pentaho Data Integration (PDI)

I recently found the need to execute a shell script within an Hadoop cluster. For a client project, after processing files via Pentaho MapReduce steps, we needed to move files on HDFS to an 'archive' location, within the same cluster. A shell script executed on an edge node (containing HDFS commands) would easily perform the move, within the cluster.

However, our limitation was due to the fact that we were running the Spoon client from windows machines. We could execute the 'archive' shell script from the Data Integration server running on linux. However, that would force us to always use the DI server when executing.

What other options exist? Enter Oozie. Oozie is shipped with most, if not all, Hadoop distributions. Integrated with the Hadoop ecosystem, Oozie can be used to orchestrate a workflow of different Hadoop and operating system utilities. Oozie supports Hive, Pig, Sqoop, MapReduce as well as system level commands including shell scripts. More information on Oozie can be found here:
http://hortonworks.com/hadoop/oozie/.

Using Oozie, we can execute the shell script from our PDI job regardless of operating system (i.e. Spoon running on Windows, Data Integration Server running on linux or the kitchen script running on linux).

To configure, Oozie requires a directory on HDFS referred to as oozie.wf.application.path. This property is required and points to the location of the application components. Within this directory, multiple components referenced from your Oozie workflow can be uploaded (e.g. pig scripts, Hive sql files, Java jar files, etc.). In addition to all of the component files, the workflow.xml specifies the actions and flow to be orchestrated on the cluster.

For my client, I needed to execute a shell script to move files from the ingest directory to an archive directory. The following workflow.xml was uploaded to HDFS directory specified via oozie.wf.application.path.

<workflow-app name="wf_archive_files" xmlns="uri:oozie:workflow:0.5">
    <start to="archive-files-action"/>
    <kill name="Kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <action name="archive-files-action">
        <shell xmlns="uri:oozie:shell-action:0.1">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <exec>${scriptPath}</exec>
            <argument>${src_dir}</argument>
            <argument>${target_dir}</argument>
            <file>${scriptPath}#${script}</file>
            <capture-output/>
        </shell>
        <ok to="End"/>
        <error to="Kill"/>
    </action>
    <end name="End"/>
</workflow-app>

You'll notice the workflow is chock full of properties. In addition to the workflow.xml, Oozie requires properties to be passed to execute. Within PDI via the Oozie Job Executor step, there are two ways to specify properties. Via Quick Mode, you specify the location of the job.properties. Via Advanced Mode, you specify each property. I utilized Advanced Mode and specified the following properties:

oozie.wf.application.path=${nameNode}/app/archive_files
nameNode=hdfs://nameservice1
jobTracker=localhost:8032

scriptPath=/app/archive_files/${script}
script=archive_files.sh

src_dir=${SRC_DIR}
target_dir=${ARCHIVE_DIR}

Using Advanced Mode, I could dynamically specify the source & target folders using parameters passed to my PDI job.

Along with the workflow.xml, the archive_files.sh shell script also needed uploaded to the same location, specified via oozie.wf.application.path.

#!/bin/sh
if [ "$#" -ne 2 ]; then
   echo "Usage: $0 "
   echo "Example: $0 /data/landing/app_stream /data/archive/app"
   exit
fi

src_dir=$1
target_dir=$2

for i in `hdfs dfs -ls -R $src_dir | awk '/a/ {print $8}'`; do
   target_folder=${i/#$src_dir/$target_dir}

   if [[ "$i" == *.txt ]]; then
      hdfs_cmd="hdfs dfs -mv $i ${target_folder%/*}"
      echo "$hdfs_cmd"
      $hdfs_cmd
   # Exclude tmp files being written by flume
   # Only grab directories to be created
   elif [[ "$i" != *.tmp ]]; then
      # Create target directory
      hdfs_cmd="hdfs dfs -mkdir -p $target_folder"
      echo $hdfs_cmd
      $hdfs_cmd
   fi
done

With all of the components in place on HDFS and properties specified within the Oozie Job Executor step, we can now execute our job and have files 'archived' from ${SRC_DIR} to ${ARCHIVE_DIR}. Just make sure to specify ${SRC_DIR} and ${ARCHIVE_DIR} as part of the PDI Job. Or explicitly specify the properties.