Second MR job is not terminating in Hadoop

By : cody123
Source: Stackoverflow.com
Question!

I had a use case where I wanted First MR's output to second MR's Input. I achieved that using ControlJob in Hadoop but At the end of the job I am getting below mentioned Exception.

java.lang.IllegalStateException: Job in state RUNNING instead of DEFINE
    at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:294)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:1288)
    at org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.submit(ControlledJob.java:335)
    at org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.run(JobControl.java:240)
    at com.hadoop.intellipaat.JoinClickImpressionDetailJob.runMRJobs(JoinClickImpressionDetailJob.java:353)
    at com.hadoop.intellipaat.JoinClickImpressionDetailJob.run(JoinClickImpressionDetailJob.java:421)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    at com.hadoop.intellipaat.JoinClickImpressionDetailJob.main(JoinClickImpressionDetailJob.java:309)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

Source Code

public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new JoinClickImpressionDetailJob(), args);
        System.exit(1);
    }

    private static int runMRJobs(String[] args) {
        int result = -1;
        Configuration conf = new Configuration();
        conf.set("mapreduce.output.fileoutputformat.compress", "true");
        conf.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.GzipCodec");
        conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");
        conf.set("mapreduce.output.fileoutputformat.compress.type", "BLOCK");

        ControlledJob mrJob1 = null;
        Job firstJob = null;
        try {
            deleteDirectory(args[2], conf);
            mrJob1 = new ControlledJob(conf);
            mrJob1.setJobName("IMPRESSION_CLICK_COMBINE_JOB");
            firstJob = mrJob1.getJob();
            result += firstMapReduceJob(args, firstJob);
        } catch (Exception e) {
            e.printStackTrace();
        }

        System.out.println("First Job Finished=============");

        System.out.println("Second Job Started=============");

        ControlledJob mrJob2 = null;
        try {
            mrJob2 = new ControlledJob(conf);
            deleteDirectory(args[3], conf);
            mrJob2.addDependingJob(mrJob1);
            mrJob2.setJobName("IMPRESSION_CLICK_COMBINE_JOB1");
            Job job2 = mrJob2.getJob();
            result += secondMapReduceJob(args, job2);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Second Job Finished=============");

        JobControl jobControl = new JobControl("Click-Impression-aggregator");
        jobControl.addJob(mrJob1);
        jobControl.addJob(mrJob2);
        jobControl.run();
        return result;
    }

    private static int secondMapReduceJob(String[] args, Job job2) throws IOException, InterruptedException, ClassNotFoundException {
        long startTime = System.currentTimeMillis();

        job2.setMapOutputKeyClass(Text.class);
        job2.setMapOutputValueClass(Text.class);
        job2.setJarByClass(JoinClickImpressionDetailJob.class);

        job2.setInputFormatClass(TextInputFormat.class);
        job2.setOutputFormatClass(TextOutputFormat.class);

        job2.setReducerClass(ImpressionAndClickReducer.class);

        FileInputFormat.setInputDirRecursive(job2, true);
        FileInputFormat.addInputPath(job2, new Path(args[2]));
        job2.setMapperClass(ImpressionClickMapper.class);

        FileOutputFormat.setOutputPath(job2, new Path(args[3]));
        job2.setNumReduceTasks(8);
        job2.setPartitionerClass(ClickNonClickPartitioner.class);
        System.out.println("Time taken : " + (System.currentTimeMillis() - startTime) / 1000);
        return job2.waitForCompletion(true) ? 1 : 0;
    }

    private static int firstMapReduceJob(String[] args, Job job) throws IOException, InterruptedException, ClassNotFoundException {

        long startTime = System.currentTimeMillis();
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setJarByClass(JoinClickImpressionDetailJob.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        job.setReducerClass(ImpressionClickReducer.class);

        FileInputFormat.setInputDirRecursive(job, true);
        /**
         * Here directory of impressions will be present
         */
        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, ImpressionMapper.class);
        /**
         * Here directory of clicks will be present
         */
        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, ClickMapper.class);

        FileOutputFormat.setOutputPath(job, new Path(args[2]));

        job.setNumReduceTasks(10);

        job.setPartitionerClass(TrackerPartitioner.class);

        System.out.println("Time taken : " + (System.currentTimeMillis() - startTime) / 1000);
        return job.waitForCompletion(true) ? 1 : 0;
    }

    private static void deleteDirectory(String args, Configuration conf) throws IOException {
        Path p = new Path(args);
        FileSystem fs = FileSystem.get(conf);
        fs.exists(p);
        fs.delete(p, true);
    }

    @Override
    public int run(String[] args) throws Exception {
        return runMRJobs(args);
    }

Complete Code : https://github.com/ragnar-lothbrok/hadoop-demo/blob/master/src/main/java/com/hadoop/intellipaat/JoinClickImpressionDetailJob.java

By : cody123


Answers

Update your code as below. You can achieve this without Control job.

private static int runMRJobs(String[] args) {

        int result = -1;      

 Configuration conf = getConf();

        conf.set("mapreduce.output.fileoutputformat.compress", "true");
        conf.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.GzipCodec");
        conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");
        conf.set("mapreduce.output.fileoutputformat.compress.type", "BLOCK");

        Job job1 = new Job(conf,"IMPRESSION_CLICK_COMBINE_JOB");

            try {
                deleteDirectory(args[2], conf);
                result += firstMapReduceJob(args, job1);
            } catch (Exception e) {
                e.printStackTrace();
            }

            System.out.println("First Job Finished=============");

            System.out.println("Second Job Started=============");

            Job job2 = new Job(conf,"IMPRESSION_CLICK_COMBINE_JOB1");
            try {
                deleteDirectory(args[3], conf);
                result += secondMapReduceJob(args, job2);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("Second Job Finished=============");
    return result;
    }

in your first job return like below

return job1.waitForCompletion(true);

in your second job return like below

return job2.waitForCompletion(true) ? 0 : 1;


I'm fairly new to VBA, but am trying to answer question to support the community. I welcome any/all corrections to this code, but I tested it. It works and it does what they asked. It is not however optimised in any way, shape, or form.

I called the initial Sheet "RawData" and the new sheet "NewData"

Option Explicit

Sub Pivot()
Dim i, m, q As Integer
i = 2
q = 3

Do While Sheets("RawData").Cells(i, 1).Value <> ""
If Sheets("NewData").Range("A1:ZZ1").Find(Sheets("RawData").Cells(i, 1), LookAT:=xlWhole) Is Nothing Then
        If Sheets("NewData").Range("A1") <> "" Then
                If Sheets("NewData").Range("B1") <> "" Then
                        Worksheets("RawData").Cells(i, 1).Copy
                        Worksheets("NewData").Activate
                        Cells(1, q).Activate
                        ActiveCell.PasteSpecial xlPasteValues
                        Worksheets("RawData").Cells(i, 2).Copy
                        Worksheets("NewData").Activate
                        Cells(2, q).Activate
                        ActiveCell.PasteSpecial xlPasteValues
                        q = q + 1
                Else
                        Worksheets("RawData").Cells(i, 1).Copy
                        Worksheets("NewData").Activate
                        Range("B1").Activate
                        m = ActiveCell.Column
                        ActiveCell.PasteSpecial xlPasteValues
                        Worksheets("RawData").Cells(i, 2).Copy
                        Worksheets("NewData").Activate
                        Cells(2, m).Activate
                        ActiveCell.PasteSpecial xlPasteValues
                End If
            Else
                          Worksheets("RawData").Cells(i, 1).Copy
                        Worksheets("NewData").Activate
                        Range("A1").Activate
                        m = ActiveCell.Column
                        ActiveCell.PasteSpecial xlPasteValues
                        Worksheets("RawData").Cells(i, 2).Copy
                        Worksheets("NewData").Activate
                        Cells(2, m).Activate
                        ActiveCell.PasteSpecial xlPasteValues
            End If


Else
Worksheets("RawData").Cells(i, 2).Copy
Worksheets("NewData").Activate
Sheets("NewData").Range("A1:ZZ1").Find(Sheets("RawData").Cells(i, 1), LookAT:=xlWhole).Activate
m = ActiveCell.Column
Worksheets("NewData").Cells(1, m).End(xlDown).Offset(1, 0).Activate
ActiveCell.PasteSpecial xlPasteValues     
End If
  i = i + 1
 Loop
End Sub


Ideally the way it works is you get the state and you extract the values from them by use deconstructors. redux works on concept of single state

For example:-

function mapStateToProps(state){
    const { auth } = state  //just taking a auth as example.
    return{
      auth
    }
}


This video can help you solving your question :)
By: admin