Monday, May 16, 2016

Flume File Channel Problems

Flume File channel Problems:

Error appending event to channel. Channel might be full. Consider increasing the channel capacity or make sure the sinks perform faster.
org.apache.flume.ChannelException: Commit failed due to IO error [channel=channel6]
        at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doRollback(FileChannel.java:616)
        at org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:168)
        at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:194)
        at org.apache.flume.source.jms.JMSSource.doProcess(JMSSource.java:258)
        at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:54)
        at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Usable space exhaused, only 524054528 bytes remaining, required 524288026 bytes
        at org.apache.flume.channel.file.Log.rollback(Log.java:712)
        at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doRollback(FileChannel.java:614)
        ... 6 more
15/04/01 07:25:27 ERROR file.Log$BackgroundWorker: Error doing checkpoint
java.io.IOException: Usable space exhaused, only 524054528 bytes remaining, required 524288000 bytes
        at org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:985)
        at org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:968)
        at org.apache.flume.channel.file.Log.access$200(Log.java:75)
        at org.apache.flume.channel.file.Log$BackgroundWorker.run(Log.java:1183)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

Observations: 
The File Channel deletes the files from all the events which have been removed at the time
of checkpoint. But, the channel will keep 2 log files per data directory even if all its
Events are taken out. Once all events from log-1, log-2 are taken out and then events going to write to log-3, log-1 will be deleted at the next checkpoint. Unless a log-3 is created,
both log-1 and log-2 will not be deleted.
The interval between two consecutive checkpoints is set to 30 seconds by default, though it is configurable.
The File Channel writes out a checkpoint periodically to make restart or recovery faster. The checkpoint is written to the directory specified as the value of the checkpointDir parameter. If the channel is stopped while it is checkpointing, the checkpoint may be incomplete or corrupt. A corrupt or incomplete checkpoint could make the restart of the channel extremely slow, as the channel would need to read and replay all data files.

To avoid this problem, it is recommended that the use DualCheckpoints parameter be set to true and that backupCheckpointDir be set. It is recommended that this directory be on a different disk than the one where the original checkpoint is stored. When these parameters are set, Flume will back up the checkpoint to the backupCheckpointDir as soon as it is completed. This ensures that once the channel has been in operation for a brief period of time (enough time for the first checkpoint to be written out and backed up), it will be able to restart from a checkpoint, even if the newest one is corrupt or incomplete, reducing the restart time drastically. The time period between consecutive checkpoints is controlled by the checkpointInterval parameter.

To ensure that the channel does not write to disks with low disk space, the minimumRequiredSpace parameter can be configured. Once the disk space on a specific disk goes down to the value set by this parameter (or 1 MB, whichever is higher), the channel ceases operation. To conserve system resources and not affect performance, the channel does not check the disk space on every write, but does it periodically, maintaining counters internally to calculate the space available on disk. This makes the assumption that no other channel or process is writing to this disk, which would make the available disk space decrease faster.

The File Channel is implemented as a Write Ahead Log. The channel keeps a reference count of the number of events in a particular data file which needs to be taken by the sink. Once all the events in a file are taken, the file will be deleted after the next checkpoint. If you want the files to get deleted faster you can reduce the maximum size of a data file through the config parameter "maxFileSize"(this is the maximum size you want each individual log file to grow to - in bytes). By default, the maxFileSize is around 1.5G. As an experiment you can reduce the file size and see if the files are getting deleted (each directory will have at least 2 files even if all events have been taken). 


Monday, May 2, 2016

Kafka issues - Windows

The following issues you will get while running kafka in windows locally.

1. WARN Error while fetching metadata with correlation id 0 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
         Initially kafka setup created successfully and able to produce the messages to producer and consumes the messages from the consumer. I got the max offset reached error messages.
         So manually all the kafka-log, zookeeper-log and topic folders deleted. Then got the error as "WARN Error while fetching metadata with correlation id 0 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)" while publishing the message to producer.
         I tried so many ways to fix this and finally got the solution as
              In server.config file change the below 2 parameters.
                  host.name=localhost
                  advertised.host.name=localhost
Then restart the Kafka server, producer. Now start sending the messages to producer and automatically consumer will consumes the messages from producer.
 
2. Always create the kafkalogs and controller files in main folder by setting the kafka.logs.dir paramater in server.config file

3. Run the kafka in windows as below
             step1 : start the zookeeper sever
                  .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
             step2 : start the kafka sever
                  .\bin\windows\kafka-server-start.bat .\config\server.properties
             step3 : list the topics which are created in kafka broker
                 .\bin\windows\kafka-topics.bat --list  --zookeeper localhost:2181
             step4 : create the topic
                  .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic test
             step5 : run the producer for the particular topic
                  .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
             step3 : run the consumer for the particular topic.
                 .\bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --from-beginning
      => the consumer consumes the messages from the first message.
                  .\bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic test
      => the consumer consumes the messages from the last offset ( Recent messages)

Scala IDE Issues

1. Error : Cross compiled with an incompatible version of scala (2.10.0) while building scala-application.
  Solution : If you are installed new version of scala and that is compiled. But ScalaIDE will provide built-in Scala 2.10.0 version. Over here you are able to see 2 Scala versions.
  Go to the Project Build Path -> Libraries -> select the Scala Library (currently scala 2.11.7 in my eclipse) -> Edit
       You can see the 4 types of Scala Library Containers
               - Latest 2.11 bundle (dynamic)
               - Latest 2.10 bundle (dynamic)
               - Fixed Scala Library container ( 2.11.7)
               - Fixed Scala Library container (2.10.6)

2. After buildig the project , Java Library automatically changed to the 1.6 version even if we changed to 1.8
    Solution: In pom.xml we had the below properties while creating maven-scala project
    <maven.compiler.source>1.6</maven.compiler.source>
    <maven.compiler.target>1.6</maven.compiler.target>
   The above paremeters are pointing to 1.6 means after build the project, The project preferences change to Java1.6
To fix the above problem you can always change the version number.
For example: If we want to point the project to Java 1.8
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>

3. Error: Could not find or load main class com.test.scala.CountWord
   Solution: By default Scala Compiler points to 1.6JVM. If you change the Scala compiler jvm1.6 to jvm1.8 then you will get the error as "Could not find or load main class Error".
        Project -> right click -> Build Path -> Scala Compiler change to jvm1.6 by default one then problem solves.

Friday, April 29, 2016

Sublime Text software for JAVA,HIVE and all formats same like notepad++ with colors

syntax highlight for Hive and Pig in Sublime Text same like notepad++.

Here are the steps if you are interested (you can skip 1 – 3 if by chance you already have Package Control plugin installed):
.       Download the sublime (If you are not an admistrator,  download portable version)
           Go to https://packagecontrol.io/installation
      .       Follow the Manual installation steps (the automated script is timing out, I assume some proxy filter is not letting it through)
      .       Afterwards you might be prompted a couple of times to restart Sublime Text as various missing dependencies get installed
      .       Once that’s done, go to Preferences > Package Control
      .       Select Install Package (6th item from top or use type ahead)
      .       You might then have a delay of a few seconds and see a “package repository updating” message in status bar
      .       A dialog box will appear with packages available for installation - search for Apache Pig and Apache Hive and install them

Scala IDE installation with maven

1. Download the scala-ide from http://scala-ide.org/
2. Install the scala-ide from scala.exe
3. while opening the scala-ide yo will found jvm issues. to fix this you can add JAVA_HOME and PATH to environment variables.
4. To integrate scala with maven follow the below steps:
        Open Eclipse --> Help --> Install New software --> Add Repository
                              Maven for scala - http://alchim31.free.fr/m2e-scala/update-site
     once selected the categories click on next--> next -> finish.
     Restart the Eclipse
     Create New Maven Project by selecting FILE -> NEW -> ADD Project -> Maven Project
   But scala-arch some times we are unable to see. To solve this we need to Remote catagories into maven arch manually.
    Window-> Preferences -> Maven ->Archetypes -> Add Remote Catalog
Then you can get scala-archtype , select and create scala-maven project.
Run the Scala project and Enjoy :-)

For your reference :
eclipse.ini configuration file:
-startup
plugins/org.eclipse.equinox.launcher_1.3.0.v20140415-2008.jar
--launcher.library
plugins/org.eclipse.equinox.launcher.win32.win32.x86_64_1.1.200.v20150204-1316
-vmargs
-Xmx2G
-Xms200m
-XX:MaxPermSize=384m
-vm
C:\Program Files (x86)\Java\jdk1.7.0_45\bin\javaw.exe

new file
-startup
plugins/org.eclipse.equinox.launcher_1.3.0.v20120522-1813.jar
--launcher.library
plugins/org.eclipse.equinox.launcher.win32.win32.x86_1.1.200.v20120522-1813
-product
com.android.ide.eclipse.adt.package.product
--launcher.XXMaxPermSize
256M
-showsplash
com.android.ide.eclipse.adt.package.product
--launcher.XXMaxPermSize
256m
--launcher.defaultAction
openFile
-vm
C:\Program Files\Java\jdk1.7.0_25\bin\javaw.exe
-vmargs
-Dosgi.requiredJavaVersion=1.6
-Xms40m
-Xmx768m
-Declipse.buildId=v21.0.0-531062

Apache Spark installation on windows

Download scalaIDE to run scala applications http://scala-ide.org/

Now we’ll see the installation steps :
  • Install Java 7 or later. Set JAVA_HOME and PATH variable as environment variables.
  • Download Scala 2.10 and install. Set SCALA_HOME  andadd %SCALA_HOME%\bin in   PATH variable in environment variables. To test whether Scala is installed or not, run following command
  • Next thing is Spark. Spark can be installed in two ways.
    •  Building Spark using SBT
    •  Use Prebuilt Spark package

 Building Spark with SBT :
  • Download SBT and install. Set SBT_HOME and PATH variable in environment variables.
  • Download source code from Spark website against any of the Hadoop version.
  • Run sbt assembly command to build the Spark package
  • You need to set Hadoop version also while building as follows : 
  •      sbt –Pyarn –pHadoop 2.3 assembly                                                                     

Using Spark Prebuilt Package:
  • Choose a Spark prebuilt package for Hadoop i.e.  Prebuilt for Hadoop 2.3/2.4 or later. Download and extract it to any drive i.e. D:\spark-1.2.1-bin-hadoop2.3
  • Set SPARK_HOME and add %SPARK_HOME%\bin in PATH in environment variables
  • Run following command on command line.                              
  • You’ll get and error for winutils.exe:
      Though we aren’t using Hadoop with Spark, but somewhere it checks for HADOOP_HOME               variable in configuration. So to overcome this error, download winutils.exe and place it in any             location (i.e. D:\winutils\bin\winutils.exe).
  • Set HADOOP_HOME = D:\winutils in environment variable
  • Now, Re run the command “spark-shell’ , you’ll see the scala shell
  • Here sometimes you can get the \tmp\hive folder have read and write permissions. Because of this issue you were unable to create sqlcontext. To fix this issue:
          From command line --> winutil folder --> bin\winutil.exe chmod 777 /tmp/hive
               if office workstation you will not have permissions to change. But if your installation points to D: folder then you can give the access to the /tmp/hive.
  • For Spark UI : open http://localhost:4040/ in browser
  • For testing the successful setup you can run the example :   
  • It will execute the program and return the result :
Enjoy the installation process :-)



Tuesday, October 13, 2015

Canary test failed to establish a connection or a client session to the ZooKeeper


Canary test failed to establish a connection or a client session to the ZooKeeper

RN ZooKeeperHiveLockManager: Unexpected ZK exception when creating parent node /hive_zookeeper_namespace_hive org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hive_zookeeper_namespace_hive at org.apache.zookeeper.KeeperException.create(KeeperException.java:99) at org.apache.zookeeper.KeeperException.create(KeeperException.java:51) at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:783) at org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager.setContext(ZooKeeperHiveLockManager.java:120) at org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager.getLockManager(DummyTxnManager.java:71) at org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager.acquireLocks(DummyTxnManager.java:100) at org.apache.hadoop.hive.ql.Driver.acquireReadWriteLocks(Driver.java:918) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1128) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:962) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:957) at org.apache.hive.service.cli.operation.SQLOperation.runInternal(SQLOperation.java:145) at org.apache.hive.service.cli.operation.SQLOperation.access$000(SQLOperation.java:69) at org.apache.hive.service.cli.operation.SQLOperation$1$1.run(SQLOperation.java:200) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)

solution: restart the cluster or restart the zookeeper and hive service.



Tuesday, January 20, 2015

part files in Hadoop Map Reduce output

Hadoop MapReduce program creates the 2 output files are part-r-00000 and _SUCCESS.

To delete the empty part files we can use LazyOutputFormat class.
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
LazyOutputFormat looks for the empty part-r-00000 files. and if it's empty, delete the file.

you no need to generate any part-r-00000 files in case of MultipleOutputs.write, then you can use
job.setOutputFormat(NullOutputFormat.class);
because you are not even calling context.write(). so you can ignore the output files by using NullOutputFormat.class.


Overwite / create the existing output path to hadoop jobs or Mapreduce


Overwite / create the existing output path to hadoop jobs or Mapreduce

Generally when we will get the below exception means The output path already exists.
Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/Users/outputfile already exists
    at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:117)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:937)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:896)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)

Solution:
1. Use MulipleOutputs class to create custom output path by adding sysdate
2. if output file exists, then delete file using FileSystem
               Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
if(fs.exists(new Path(args[1]))){
  fs.delete(new Path(args[1]),true);
}
    For Local fileSystem you can use:
               //FileSystem.getLocal(conf).delete(outputDir, true);
    or HDFS 
   //Path outputDir = new Path( args[2] );

   // outputDir.getFileSystem( jobConf ).delete( outputDir, true );
3. Writing custom output format class, override setOutputPath to change the output path and assign that object to JobContext.

create dynamic output directory from reducer using Map Reduce

Using MultipleOutputs class we can create our own output directory.
For example create the Folderpath as /<output path>/<System date>/<Namedoutput>

In Main Driver we need to mention the actual output path:
MultipleOutputs.addNamedOutput(job, "Combined", TextOutputFormat.class, Text.class, Text.class);
        MultipleOutputs.addNamedOutput(job, "UnProcessed", TextOutputFormat.class, Text.class, Text.class);

Reducer Class:
public class Reducer extends Reducer<Text, TextArrayWritable, Text, Text> {
MultipleOutputs<Text, Text> mos;
String currentDate= "";

public void setup(Context context) {
mos = new MultipleOutputs<Text, Text>(context);
DateFormat dateFormat = new SimpleDateFormat("yyyyMMdd-HHmmss");
  //get current date time with Date()
Date date = new Date();
currentDate = dateFormat.format(date);
}

public void reduce(Text key, Iterable<TextArrayWritable> values,
Context context) throws IOException, InterruptedException {
  
ArrayList<Text[]> sortedList = new ArrayList<Text[]>();
//divided into MsgReq and MsgResponse Lists
for (TextArrayWritable value : values) {
Text[] createList=(Text[])value.toArray();
if (!(createList[0].toString().contains("0000000000000000"))) {
mos.write("Dummy", key,new Text(createList[0].toString()),currentDate+"/Dummy");
}
sortedList.add((Text[]) value.toArray());
}
}

protected void cleanup(Context context) throws IOException,
InterruptedException {
mos.close();
}

}

Here:
mos.write("Dummy", key,new Text(createList[0].toString()),currentDate+"/Dummy");
In the above statement 
First argument: NamedOutput is "Dummy"
Second argument: Key is key
Third argument: value is new Text(createList[0].toString())
Fourth argument: basOutputPath is currentDate+"/Dummy" <sysdate+"/"+NamedOutput>

Result:
sysdate/Dummy-r-00000

Have a nice day...............:-)
           

Thursday, January 15, 2015

Java program to connect TIBCO JMS Queue

Java program to connect TIBCO JMS Queue:
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import com.google.common.io.Files;


public class test2 {
public static void main(String[] args) throws IOException {
String userName = "<username>;
   //String password = "<password>";
   try {

       String passwordFile = "<password file>";
       String password = Files.toString(new File(passwordFile),Charset.forName("UTF-8")).trim();
     
       // Obtain a JNDI connection
       Properties env = new Properties();
       env.put(Context.INITIAL_CONTEXT_FACTORY,
               "com.tibco.tibjms.naming.TibjmsInitialContextFactory");
       env.put(Context.PROVIDER_URL, "tcp://<server>:<portno>");
       env.put(Context.SECURITY_PRINCIPAL, userName);
       env.put(Context.SECURITY_CREDENTIALS, password);
       InitialContext jndi = new InitialContext(env);
       InitialContext initalContext;
       ConnectionFactory factory = (ConnectionFactory) jndi.lookup("QueueConnectionFactory");
     

       try {
           Connection connection = factory.createConnection(userName,
                   password);
           Session session = connection.createSession(true, Session.SESSION_TRANSACTED);

           Queue sampleTopic = (Queue) jndi.lookup("<Queue lookup name>");

       } catch (JMSException je) {
           je.printStackTrace();

       }

   } catch (NamingException e) {
       e.printStackTrace();
   }
}

}



Wednesday, January 14, 2015

JMS source TIBCO with flume - required jars and plugin directory structure

Flume conf file : flumeconfjms.conf with out username and password
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
## JMS Source - listening on a QUEUE
a1.sources.r1.type = jms
a1.sources.r1.initialContextFactory = com.tibco.tibjms.naming.TibjmsInitialContextFactory
a1.sources.r1.connectionFactory = QueueConnectionFactory <Connection factory JNDI name>
#a1.sources.r1.connectionFactory = com.tibco.tibjms.TibjmsQueueConnectionFactory
a1.sources.r1.providerURL = tcp://<servername>:<portnumber>
a1.sources.r1.destinationName = TIBCO.HADOOP.POC (Destination Queue)
a1.sources.r1.destinationType = QUEUE
a1.sources.r1.channels = c1

# Describe sink
a1.sinks.k1.type = HDFS
a1.sinks.k1.hdfs.path = /user/flume_test <HDFS Path>

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Dependency Jars:
jms.jar
jms-api-2.jar
tibcrypt.jar
tibemsd_sec.jar
tibjms.jar
tibjmsadmin.jar
tibjmsapps.jar
tibjmsufo.jar
tibrvjms.jar

Run the flume:
flume-ng agent --plugins-path <plugin-parent dir>/plugins.d -n a1 -c conf -f flumeconfjms.conf

plugins.d directory structure:


JMS Source with TIBCO by using Flume

Exception : node.AbstractConfigurationProvider: Source r1 has been removed due to an error during configuration
org.apache.flume.FlumeException: Could not lookup ConnectionFactory
        at org.apache.flume.source.jms.JMSSource.doConfigure(JMSSource.java:233)
Caused by: javax.naming.AuthenticationException: Not permitted: invalid name or password [Root exception is javax.jms.JMSSecurityException: invalid name or password]
        at com.tibco.tibjms.naming.TibjmsContext.lookup(TibjmsContext.java:670)
        at com.tibco.tibjms.naming.TibjmsContext.lookup(TibjmsContext.java:491)
        at javax.naming.InitialContext.lookup(InitialContext.java:411)
        at org.apache.flume.source.jms.JMSSource.doConfigure(JMSSource.java:230)
        ... 12 more
Caused by: javax.jms.JMSSecurityException: invalid name or password
        at com.tibco.tibjms.Tibjmsx.buildException(Tibjmsx.java:612)
        at com.tibco.tibjms.TibjmsConnection._create(TibjmsConnection.java:1394)
        at com.tibco.tibjms.TibjmsConnection.<init>(TibjmsConnection.java:4311)
        at com.tibco.tibjms.TibjmsQueueConnection.<init>(TibjmsQueueConnection.java:36)
        at com.tibco.tibjms.TibjmsxCFImpl._createImpl(TibjmsxCFImpl.java:200)
        at com.tibco.tibjms.TibjmsxCFImpl._createConnection(TibjmsxCFImpl.java:253)


Solution:
Flume conf file : flumeconfjms.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
## JMS Source - listening on a QUEUE
a1.sources.r1.type = jms
a1.sources.r1.initialContextFactory = com.tibco.tibjms.naming.TibjmsInitialContextFactory
a1.sources.r1.connectionFactory = QueueConnectionFactory <Connection factory JNDI name>
#a1.sources.r1.connectionFactory = com.tibco.tibjms.TibjmsQueueConnectionFactory
a1.sources.r1.providerURL = tcp://<servername>:<portnumber>
a1.sources.r1.destinationName = TIBCO.HADOOP.POC (Destination Queue)
a1.sources.r1.destinationType = QUEUE
a1.sources.r1.channels = c1
a1.sources.r1.userName = <username>
a1.sources.r1.passwordFile= /home/user/jms-source/password.txt <password file local location>

# Describe sink
a1.sinks.k1.type = HDFS
a1.sinks.k1.hdfs.path = /user/flume_test <HDFS Path>

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

In flume-jms-source-1.5.0-cdh5.2.1.jar , JMSSource.java doesnot contain the below statement
    contextProperties.setProperty(
       javax.naming.Context.SECURITY_PRINCIPAL, userName.get());
      contextProperties.setProperty(
       javax.naming.Context.SECURITY_CREDENTIALS, password.get());
Once you will add the statements and rebuild the jar and copy into flume_lib folder by overwriting/ deleting the existing jar.

After changes JMSSource.java look like as follows:
   try {
      Properties contextProperties = new Properties();
      contextProperties.setProperty(
          javax.naming.Context.INITIAL_CONTEXT_FACTORY,
          initialContextFactoryName);
      contextProperties.setProperty(
          javax.naming.Context.PROVIDER_URL, providerUrl);
      contextProperties.setProperty(
     javax.naming.Context.SECURITY_PRINCIPAL, userName.get());
      contextProperties.setProperty(
     javax.naming.Context.SECURITY_CREDENTIALS, password.get());
      initialContext = initialContextFactory.create(contextProperties);
    } catch (NamingException e) {
      throw new FlumeException(String.format(
          "Could not create initial context %s provider %s",
          initialContextFactoryName, providerUrl), e);
    }

    try {
      connectionFactory = (ConnectionFactory) initialContext.
          lookup(connectionFactoryName);
    } catch (NamingException e) {
      throw new FlumeException("Could not lookup ConnectionFactory", e);
    } 

command to run the flume agent :
flume-ng agent --plugins-path <plugin directory> -n a1 -c conf -f flumeconfjms.conf

put some message into the tibco queue then streaming files automatically copied into your sink (hdfs taget file).

Enjoy.............. Have a nice day..... :-) 

Thursday, January 8, 2015

Run copyFromLocal command by using Oozie

1. If we use multinode cluster then we are unable to run copyFromLocal command by using Oozie.
Because the sorurce path is not available in the datanode where the task runs.
or else we need to copy the files into all the datnodes this will take too much space in each and every datanode.

2. By using Java Action we need to copy the file from local server to HDFS
Javacode

3. ssh commands will connect to the server directly and after wards run the commands. Oozie supports passwordless authentication in ssh action.

4. sshpass or scp by using password
sudo apt-get install sshpass
sshpass -p '<password>' <ssh/scp command>

5. If we want to connect with password by using ssh action in oozie we can use expect and send commands:
#!/usr/bin/expect -f
spawn ssh root@myhost
expect -exact "root@myhost's password: "
send -- "mypassword\r"
interact

Error: copyFromLocal: `/home/workspace/testmkdir.sh': No such file or directory
Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.ShellMain], exit code [1]

Run oozie shell action from the shell script File

1. create a shell action node which invokes hadoop fs command to create the folder with mkdir command.
testmkdir.sh
processdate=$(date +%y%m%d_%H%M)
hadoop fs -mkdir /user/TEST_$processdate

2. Copy local testmkdir.sh file into the HDFS location.
3. Open Hue from the Browser and create the new workflow with shell action node as follows:
shell command: mention the filename
Files: add the path name
The above mentioned paths are HDFS paths.


Workflow.xml

<workflow-app name="shellTest" xmlns="uri:oozie:workflow:0.4">
    <start to="shelltest"/>
    <action name="shelltest">
        <shell xmlns="uri:oozie:shell-action:0.1">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <exec>testmkdir.sh</exec>
            <file>/user/reqFiles/testmkdir.sh#testmkdir.sh</file>
        </shell>
        <ok to="end"/>
        <error to="kill"/>
    </action>
    <kill name="kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

4. Submit the oozie workflow.....Enjoy....:-)

Error: Cannot run program "testmkdir.sh" (in directory "/home/yarn/nm/usercache/appcache/application_1417676870603_1385/container_1417676870603_1385_01_000006"): error=2, No such file or directory

Wednesday, December 24, 2014

E0800: Action it is not running its in [PREP] state


ISSUE : E0800: Action it is not running its in [PREP] state
If we will get the following error while running Subworkflow in main Workflow:
EXCEPTION Message:
140317114034876-oozie-biad-W] ACTION[0000002-140317114034876-oozie-biad-W@MDA_TGT_All] XException,
org.apache.oozie.command.CommandException: E0800: Action it is not running its in [PREP] state, action [0000002-140317114034876-oozie-biad-W@MDA_TGT_All]
    at org.apache.oozie.command.wf.CompletedActionXCommand.eagerVerifyPrecondition(CompletedActionXCommand.java:85)
    at org.apache.oozie.command.XCommand.call(XCommand.java:248)
    at org.apache.oozie.service.CallableQueueService$CallableWrapper.run(CallableQueueService.java:175)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:897)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:919)
    at java.lang.Thread.run(Thread.java:738)

Observations:
if we run the subworkflow individually it works fine.
when you added that subworkflow in main workflow then only this issue raises.

Solution:
We set the properties to the subworkflow , again we need to set the same properties to the main workflow.
because while running in oozie, all the properties are assigning from the mainworkflow to the subworkflow.

Wednesday, August 27, 2014

Big Data Overview



Big Data:
Big data is the data which it’s growing from scarce to superabundant day by day. We all are making big buzz about big data, it’s good and big-headache to analyze. Now this is a very hot topic. It’s defined as Volume, Velocity, and Variety by IBM.As you know we all are using Facebook account to upload photos, videos and messages on daily basis.

       7TB of data are processed by Twitter every day.
       10TB of data are processed by Facebook every day.       
       2 billion internet users in the world today.
       4.6 billion mobile phones in 2011.

Rollin Ford, the CIO of Wal-Mart said “Every day I wake up and ask, ‘how can I flow data better, manage data better, analyse data better?”.

Big Data History:
As we know, we are always gathering information about documents, photos and videos in storage devices by using latest technologies and devices (with large pixels). In Olden days, people collected the papers and stored in the warehouse or library in form of records, books etc.

The fact of the matter is big data is not new. In 1940s, Libraries effected and had to adapt their storage methods to meet the quickly increasing papers of new publications and re-search and encounter the first attempts to qualify the growth rate in the volume of data as information explosion in 1941.

In 1997s, the term “Big Data” was used for first time in an article by NASA researchers M.Cox and D. Ellsworth to identify the rise of data was becoming an issue for current computer systems.

In 1999s, Doug Cutting wrote the Lucene is the key component of search index in Apache Nutch, Apache Solr and Elastic Search. Nutch faced some troubles to manage computations running on parallel computers while building an open source engine. In 2003 and 2004, Google published its GFS and Map reduces papers, the route became clear to implement Hadoop by Doug Cutting in 2006.

Today, companies in every industry is facing big data problems which generated by their daily-increased ability to collect information.

Big Data Key Challenges:
1.      Performance
2.      Network Traffic
3.      Risk Failure
4.      Analysing the data
5.      Addressing data quality
6.      Displaying meaningful results
7.      Dealing with reports by using graphical presentation

Big Data Technologies:

  •  Data Scientist
  •  In-Memory databases
  •  Distributed Databases
  •  NO-SQL
  •  Data warehouse appliances
  •  Hadoop