Monday, May 16, 2016

Flume File Channel Problems

Flume File channel Problems:

Error appending event to channel. Channel might be full. Consider increasing the channel capacity or make sure the sinks perform faster.
org.apache.flume.ChannelException: Commit failed due to IO error [channel=channel6]
        at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doRollback(FileChannel.java:616)
        at org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:168)
        at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:194)
        at org.apache.flume.source.jms.JMSSource.doProcess(JMSSource.java:258)
        at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:54)
        at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Usable space exhaused, only 524054528 bytes remaining, required 524288026 bytes
        at org.apache.flume.channel.file.Log.rollback(Log.java:712)
        at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doRollback(FileChannel.java:614)
        ... 6 more
15/04/01 07:25:27 ERROR file.Log$BackgroundWorker: Error doing checkpoint
java.io.IOException: Usable space exhaused, only 524054528 bytes remaining, required 524288000 bytes
        at org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:985)
        at org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:968)
        at org.apache.flume.channel.file.Log.access$200(Log.java:75)
        at org.apache.flume.channel.file.Log$BackgroundWorker.run(Log.java:1183)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

Observations: 
The File Channel deletes the files from all the events which have been removed at the time
of checkpoint. But, the channel will keep 2 log files per data directory even if all its
Events are taken out. Once all events from log-1, log-2 are taken out and then events going to write to log-3, log-1 will be deleted at the next checkpoint. Unless a log-3 is created,
both log-1 and log-2 will not be deleted.
The interval between two consecutive checkpoints is set to 30 seconds by default, though it is configurable.
The File Channel writes out a checkpoint periodically to make restart or recovery faster. The checkpoint is written to the directory specified as the value of the checkpointDir parameter. If the channel is stopped while it is checkpointing, the checkpoint may be incomplete or corrupt. A corrupt or incomplete checkpoint could make the restart of the channel extremely slow, as the channel would need to read and replay all data files.

To avoid this problem, it is recommended that the use DualCheckpoints parameter be set to true and that backupCheckpointDir be set. It is recommended that this directory be on a different disk than the one where the original checkpoint is stored. When these parameters are set, Flume will back up the checkpoint to the backupCheckpointDir as soon as it is completed. This ensures that once the channel has been in operation for a brief period of time (enough time for the first checkpoint to be written out and backed up), it will be able to restart from a checkpoint, even if the newest one is corrupt or incomplete, reducing the restart time drastically. The time period between consecutive checkpoints is controlled by the checkpointInterval parameter.

To ensure that the channel does not write to disks with low disk space, the minimumRequiredSpace parameter can be configured. Once the disk space on a specific disk goes down to the value set by this parameter (or 1 MB, whichever is higher), the channel ceases operation. To conserve system resources and not affect performance, the channel does not check the disk space on every write, but does it periodically, maintaining counters internally to calculate the space available on disk. This makes the assumption that no other channel or process is writing to this disk, which would make the available disk space decrease faster.

The File Channel is implemented as a Write Ahead Log. The channel keeps a reference count of the number of events in a particular data file which needs to be taken by the sink. Once all the events in a file are taken, the file will be deleted after the next checkpoint. If you want the files to get deleted faster you can reduce the maximum size of a data file through the config parameter "maxFileSize"(this is the maximum size you want each individual log file to grow to - in bytes). By default, the maxFileSize is around 1.5G. As an experiment you can reduce the file size and see if the files are getting deleted (each directory will have at least 2 files even if all events have been taken). 


Monday, May 2, 2016

Kafka issues - Windows

The following issues you will get while running kafka in windows locally.

1. WARN Error while fetching metadata with correlation id 0 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
         Initially kafka setup created successfully and able to produce the messages to producer and consumes the messages from the consumer. I got the max offset reached error messages.
         So manually all the kafka-log, zookeeper-log and topic folders deleted. Then got the error as "WARN Error while fetching metadata with correlation id 0 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)" while publishing the message to producer.
         I tried so many ways to fix this and finally got the solution as
              In server.config file change the below 2 parameters.
                  host.name=localhost
                  advertised.host.name=localhost
Then restart the Kafka server, producer. Now start sending the messages to producer and automatically consumer will consumes the messages from producer.
 
2. Always create the kafkalogs and controller files in main folder by setting the kafka.logs.dir paramater in server.config file

3. Run the kafka in windows as below
             step1 : start the zookeeper sever
                  .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
             step2 : start the kafka sever
                  .\bin\windows\kafka-server-start.bat .\config\server.properties
             step3 : list the topics which are created in kafka broker
                 .\bin\windows\kafka-topics.bat --list  --zookeeper localhost:2181
             step4 : create the topic
                  .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic test
             step5 : run the producer for the particular topic
                  .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
             step3 : run the consumer for the particular topic.
                 .\bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --from-beginning
      => the consumer consumes the messages from the first message.
                  .\bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic test
      => the consumer consumes the messages from the last offset ( Recent messages)

Scala IDE Issues

1. Error : Cross compiled with an incompatible version of scala (2.10.0) while building scala-application.
  Solution : If you are installed new version of scala and that is compiled. But ScalaIDE will provide built-in Scala 2.10.0 version. Over here you are able to see 2 Scala versions.
  Go to the Project Build Path -> Libraries -> select the Scala Library (currently scala 2.11.7 in my eclipse) -> Edit
       You can see the 4 types of Scala Library Containers
               - Latest 2.11 bundle (dynamic)
               - Latest 2.10 bundle (dynamic)
               - Fixed Scala Library container ( 2.11.7)
               - Fixed Scala Library container (2.10.6)

2. After buildig the project , Java Library automatically changed to the 1.6 version even if we changed to 1.8
    Solution: In pom.xml we had the below properties while creating maven-scala project
    <maven.compiler.source>1.6</maven.compiler.source>
    <maven.compiler.target>1.6</maven.compiler.target>
   The above paremeters are pointing to 1.6 means after build the project, The project preferences change to Java1.6
To fix the above problem you can always change the version number.
For example: If we want to point the project to Java 1.8
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>

3. Error: Could not find or load main class com.test.scala.CountWord
   Solution: By default Scala Compiler points to 1.6JVM. If you change the Scala compiler jvm1.6 to jvm1.8 then you will get the error as "Could not find or load main class Error".
        Project -> right click -> Build Path -> Scala Compiler change to jvm1.6 by default one then problem solves.