Tuesday, January 20, 2015

part files in Hadoop Map Reduce output

Hadoop MapReduce program creates the 2 output files are part-r-00000 and _SUCCESS.

To delete the empty part files we can use LazyOutputFormat class.
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
LazyOutputFormat looks for the empty part-r-00000 files. and if it's empty, delete the file.

you no need to generate any part-r-00000 files in case of MultipleOutputs.write, then you can use
job.setOutputFormat(NullOutputFormat.class);
because you are not even calling context.write(). so you can ignore the output files by using NullOutputFormat.class.


Overwite / create the existing output path to hadoop jobs or Mapreduce


Overwite / create the existing output path to hadoop jobs or Mapreduce

Generally when we will get the below exception means The output path already exists.
Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/Users/outputfile already exists
    at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:117)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:937)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:896)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)

Solution:
1. Use MulipleOutputs class to create custom output path by adding sysdate
2. if output file exists, then delete file using FileSystem
               Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
if(fs.exists(new Path(args[1]))){
  fs.delete(new Path(args[1]),true);
}
    For Local fileSystem you can use:
               //FileSystem.getLocal(conf).delete(outputDir, true);
    or HDFS 
   //Path outputDir = new Path( args[2] );

   // outputDir.getFileSystem( jobConf ).delete( outputDir, true );
3. Writing custom output format class, override setOutputPath to change the output path and assign that object to JobContext.

create dynamic output directory from reducer using Map Reduce

Using MultipleOutputs class we can create our own output directory.
For example create the Folderpath as /<output path>/<System date>/<Namedoutput>

In Main Driver we need to mention the actual output path:
MultipleOutputs.addNamedOutput(job, "Combined", TextOutputFormat.class, Text.class, Text.class);
        MultipleOutputs.addNamedOutput(job, "UnProcessed", TextOutputFormat.class, Text.class, Text.class);

Reducer Class:
public class Reducer extends Reducer<Text, TextArrayWritable, Text, Text> {
MultipleOutputs<Text, Text> mos;
String currentDate= "";

public void setup(Context context) {
mos = new MultipleOutputs<Text, Text>(context);
DateFormat dateFormat = new SimpleDateFormat("yyyyMMdd-HHmmss");
  //get current date time with Date()
Date date = new Date();
currentDate = dateFormat.format(date);
}

public void reduce(Text key, Iterable<TextArrayWritable> values,
Context context) throws IOException, InterruptedException {
  
ArrayList<Text[]> sortedList = new ArrayList<Text[]>();
//divided into MsgReq and MsgResponse Lists
for (TextArrayWritable value : values) {
Text[] createList=(Text[])value.toArray();
if (!(createList[0].toString().contains("0000000000000000"))) {
mos.write("Dummy", key,new Text(createList[0].toString()),currentDate+"/Dummy");
}
sortedList.add((Text[]) value.toArray());
}
}

protected void cleanup(Context context) throws IOException,
InterruptedException {
mos.close();
}

}

Here:
mos.write("Dummy", key,new Text(createList[0].toString()),currentDate+"/Dummy");
In the above statement 
First argument: NamedOutput is "Dummy"
Second argument: Key is key
Third argument: value is new Text(createList[0].toString())
Fourth argument: basOutputPath is currentDate+"/Dummy" <sysdate+"/"+NamedOutput>

Result:
sysdate/Dummy-r-00000

Have a nice day...............:-)
           

Thursday, January 15, 2015

Java program to connect TIBCO JMS Queue

Java program to connect TIBCO JMS Queue:
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import com.google.common.io.Files;


public class test2 {
public static void main(String[] args) throws IOException {
String userName = "<username>;
   //String password = "<password>";
   try {

       String passwordFile = "<password file>";
       String password = Files.toString(new File(passwordFile),Charset.forName("UTF-8")).trim();
     
       // Obtain a JNDI connection
       Properties env = new Properties();
       env.put(Context.INITIAL_CONTEXT_FACTORY,
               "com.tibco.tibjms.naming.TibjmsInitialContextFactory");
       env.put(Context.PROVIDER_URL, "tcp://<server>:<portno>");
       env.put(Context.SECURITY_PRINCIPAL, userName);
       env.put(Context.SECURITY_CREDENTIALS, password);
       InitialContext jndi = new InitialContext(env);
       InitialContext initalContext;
       ConnectionFactory factory = (ConnectionFactory) jndi.lookup("QueueConnectionFactory");
     

       try {
           Connection connection = factory.createConnection(userName,
                   password);
           Session session = connection.createSession(true, Session.SESSION_TRANSACTED);

           Queue sampleTopic = (Queue) jndi.lookup("<Queue lookup name>");

       } catch (JMSException je) {
           je.printStackTrace();

       }

   } catch (NamingException e) {
       e.printStackTrace();
   }
}

}



Wednesday, January 14, 2015

JMS source TIBCO with flume - required jars and plugin directory structure

Flume conf file : flumeconfjms.conf with out username and password
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
## JMS Source - listening on a QUEUE
a1.sources.r1.type = jms
a1.sources.r1.initialContextFactory = com.tibco.tibjms.naming.TibjmsInitialContextFactory
a1.sources.r1.connectionFactory = QueueConnectionFactory <Connection factory JNDI name>
#a1.sources.r1.connectionFactory = com.tibco.tibjms.TibjmsQueueConnectionFactory
a1.sources.r1.providerURL = tcp://<servername>:<portnumber>
a1.sources.r1.destinationName = TIBCO.HADOOP.POC (Destination Queue)
a1.sources.r1.destinationType = QUEUE
a1.sources.r1.channels = c1

# Describe sink
a1.sinks.k1.type = HDFS
a1.sinks.k1.hdfs.path = /user/flume_test <HDFS Path>

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Dependency Jars:
jms.jar
jms-api-2.jar
tibcrypt.jar
tibemsd_sec.jar
tibjms.jar
tibjmsadmin.jar
tibjmsapps.jar
tibjmsufo.jar
tibrvjms.jar

Run the flume:
flume-ng agent --plugins-path <plugin-parent dir>/plugins.d -n a1 -c conf -f flumeconfjms.conf

plugins.d directory structure:


JMS Source with TIBCO by using Flume

Exception : node.AbstractConfigurationProvider: Source r1 has been removed due to an error during configuration
org.apache.flume.FlumeException: Could not lookup ConnectionFactory
        at org.apache.flume.source.jms.JMSSource.doConfigure(JMSSource.java:233)
Caused by: javax.naming.AuthenticationException: Not permitted: invalid name or password [Root exception is javax.jms.JMSSecurityException: invalid name or password]
        at com.tibco.tibjms.naming.TibjmsContext.lookup(TibjmsContext.java:670)
        at com.tibco.tibjms.naming.TibjmsContext.lookup(TibjmsContext.java:491)
        at javax.naming.InitialContext.lookup(InitialContext.java:411)
        at org.apache.flume.source.jms.JMSSource.doConfigure(JMSSource.java:230)
        ... 12 more
Caused by: javax.jms.JMSSecurityException: invalid name or password
        at com.tibco.tibjms.Tibjmsx.buildException(Tibjmsx.java:612)
        at com.tibco.tibjms.TibjmsConnection._create(TibjmsConnection.java:1394)
        at com.tibco.tibjms.TibjmsConnection.<init>(TibjmsConnection.java:4311)
        at com.tibco.tibjms.TibjmsQueueConnection.<init>(TibjmsQueueConnection.java:36)
        at com.tibco.tibjms.TibjmsxCFImpl._createImpl(TibjmsxCFImpl.java:200)
        at com.tibco.tibjms.TibjmsxCFImpl._createConnection(TibjmsxCFImpl.java:253)


Solution:
Flume conf file : flumeconfjms.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
## JMS Source - listening on a QUEUE
a1.sources.r1.type = jms
a1.sources.r1.initialContextFactory = com.tibco.tibjms.naming.TibjmsInitialContextFactory
a1.sources.r1.connectionFactory = QueueConnectionFactory <Connection factory JNDI name>
#a1.sources.r1.connectionFactory = com.tibco.tibjms.TibjmsQueueConnectionFactory
a1.sources.r1.providerURL = tcp://<servername>:<portnumber>
a1.sources.r1.destinationName = TIBCO.HADOOP.POC (Destination Queue)
a1.sources.r1.destinationType = QUEUE
a1.sources.r1.channels = c1
a1.sources.r1.userName = <username>
a1.sources.r1.passwordFile= /home/user/jms-source/password.txt <password file local location>

# Describe sink
a1.sinks.k1.type = HDFS
a1.sinks.k1.hdfs.path = /user/flume_test <HDFS Path>

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

In flume-jms-source-1.5.0-cdh5.2.1.jar , JMSSource.java doesnot contain the below statement
    contextProperties.setProperty(
       javax.naming.Context.SECURITY_PRINCIPAL, userName.get());
      contextProperties.setProperty(
       javax.naming.Context.SECURITY_CREDENTIALS, password.get());
Once you will add the statements and rebuild the jar and copy into flume_lib folder by overwriting/ deleting the existing jar.

After changes JMSSource.java look like as follows:
   try {
      Properties contextProperties = new Properties();
      contextProperties.setProperty(
          javax.naming.Context.INITIAL_CONTEXT_FACTORY,
          initialContextFactoryName);
      contextProperties.setProperty(
          javax.naming.Context.PROVIDER_URL, providerUrl);
      contextProperties.setProperty(
     javax.naming.Context.SECURITY_PRINCIPAL, userName.get());
      contextProperties.setProperty(
     javax.naming.Context.SECURITY_CREDENTIALS, password.get());
      initialContext = initialContextFactory.create(contextProperties);
    } catch (NamingException e) {
      throw new FlumeException(String.format(
          "Could not create initial context %s provider %s",
          initialContextFactoryName, providerUrl), e);
    }

    try {
      connectionFactory = (ConnectionFactory) initialContext.
          lookup(connectionFactoryName);
    } catch (NamingException e) {
      throw new FlumeException("Could not lookup ConnectionFactory", e);
    } 

command to run the flume agent :
flume-ng agent --plugins-path <plugin directory> -n a1 -c conf -f flumeconfjms.conf

put some message into the tibco queue then streaming files automatically copied into your sink (hdfs taget file).

Enjoy.............. Have a nice day..... :-) 

Thursday, January 8, 2015

Run copyFromLocal command by using Oozie

1. If we use multinode cluster then we are unable to run copyFromLocal command by using Oozie.
Because the sorurce path is not available in the datanode where the task runs.
or else we need to copy the files into all the datnodes this will take too much space in each and every datanode.

2. By using Java Action we need to copy the file from local server to HDFS
Javacode

3. ssh commands will connect to the server directly and after wards run the commands. Oozie supports passwordless authentication in ssh action.

4. sshpass or scp by using password
sudo apt-get install sshpass
sshpass -p '<password>' <ssh/scp command>

5. If we want to connect with password by using ssh action in oozie we can use expect and send commands:
#!/usr/bin/expect -f
spawn ssh root@myhost
expect -exact "root@myhost's password: "
send -- "mypassword\r"
interact

Error: copyFromLocal: `/home/workspace/testmkdir.sh': No such file or directory
Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.ShellMain], exit code [1]

Run oozie shell action from the shell script File

1. create a shell action node which invokes hadoop fs command to create the folder with mkdir command.
testmkdir.sh
processdate=$(date +%y%m%d_%H%M)
hadoop fs -mkdir /user/TEST_$processdate

2. Copy local testmkdir.sh file into the HDFS location.
3. Open Hue from the Browser and create the new workflow with shell action node as follows:
shell command: mention the filename
Files: add the path name
The above mentioned paths are HDFS paths.


Workflow.xml

<workflow-app name="shellTest" xmlns="uri:oozie:workflow:0.4">
    <start to="shelltest"/>
    <action name="shelltest">
        <shell xmlns="uri:oozie:shell-action:0.1">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <exec>testmkdir.sh</exec>
            <file>/user/reqFiles/testmkdir.sh#testmkdir.sh</file>
        </shell>
        <ok to="end"/>
        <error to="kill"/>
    </action>
    <kill name="kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

4. Submit the oozie workflow.....Enjoy....:-)

Error: Cannot run program "testmkdir.sh" (in directory "/home/yarn/nm/usercache/appcache/application_1417676870603_1385/container_1417676870603_1385_01_000006"): error=2, No such file or directory