Analyze Big Data Platforms For Security and Performance

How to stream hdfs log data into Kafka

As Apache Eagle consumes the data via Kafka1 topics in some topologies, such as HDFS audit log. To enable the full function of monitoring, a user needs to stream its data into a Kafka topic.

There are two ways to do that. The first one is Logstash, which naturally supports Kafka as the output plugin; the second one is to install a namenode log4j Kafka appender.

Logstash-kafka

  • Step 1: Create a Kafka topic as the streaming input.

    Here is an sample Kafka command to create topic ‘sandbox_hdfs_audit_log’

    cd <kafka-home>
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sandbox_hdfs_audit_log
    
  • Step 2: Install Logstash-kafka plugin

    • For Logstash 1.5.x, logstash-kafka has been intergrated into logstash-input-kafka and logstash-output-kafka, and released with the 1.5 version of Logstash. So you can directly use it.

    • For Logstash 1.4.x, a user should install logstash-kafka firstly. Notice that this version does not support partition_key_format.

  • Step 3: Create a Logstash configuration file under ${LOGSTASH_HOME}/conf. Here is a sample.

      input {
          file {
              type => "hdp-nn-audit"
              path => "/path/to/audit.log"
              start_position => end
              sincedb_path => "/var/log/logstash/"
           }
      }
    
      filter{
          if [type] == "hdp-nn-audit" {
      	   grok {
      	       match => ["message", "ugi=(?<user>([\w\d\-]+))@|ugi=(?<user>([\w\d\-]+))/[\w\d\-.]+@|ugi=(?<user>([\w\d.\-_]+))[\s(]+"]
      	   }
          }
      }
    
      output {
          if [type] == "hdp-nn-audit" {
              kafka {
                  codec => plain {
                      format => "%{message}"
                  }
                  broker_list => "localhost:9092"
                  topic_id => "sandbox_hdfs_audit_log"
                  request_required_acks => 0
                  request_timeout_ms => 10000
                  producer_type => "async"
                  message_send_max_retries => 3
                  retry_backoff_ms => 100
                  queue_buffering_max_ms => 5000
                  queue_enqueue_timeout_ms => 5000
                  batch_num_messages => 200
                  send_buffer_bytes => 102400
                  client_id => "hdp-nn-audit"
                  partition_key_format => "%{user}"
              }
              # stdout { codec => rubydebug }
          }
      }
    
  • Step 4: Start Logstash

    bin/logstash -f conf/sample.conf
    
  • Step 5: Check whether logs are flowing into the kafka topic specified by topic_id

Log4j Kafka Appender

Notice that if you use Ambari2, such as in sandbox, you must follow below steps via Ambari UI. In addition, restarting namenode is required.

  • Step 1: Create a Kafka topic. Here is a example Kafka command for creating topic “sandbox_hdfs_audit_log”

    cd <kafka-home>
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sandbox_hdfs_audit_log
    
  • Step 2: Configure $HADOOP_CONF_DIR/log4j.properties, and add a log4j appender “KAFKA_HDFS_AUDIT” to hdfs audit logging

    log4j.appender.KAFKA_HDFS_AUDIT=org.apache.eagle.log4j.kafka.KafkaLog4jAppender
    log4j.appender.KAFKA_HDFS_AUDIT.Topic=sandbox_hdfs_audit_log
    log4j.appender.KAFKA_HDFS_AUDIT.BrokerList=sandbox.hortonworks.com:6667
    log4j.appender.KAFKA_HDFS_AUDIT.KeyClass=org.apache.eagle.log4j.kafka.hadoop.AuditLogKeyer
    log4j.appender.KAFKA_HDFS_AUDIT.Layout=org.apache.log4j.PatternLayout
    log4j.appender.KAFKA_HDFS_AUDIT.Layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
    log4j.appender.KAFKA_HDFS_AUDIT.ProducerType=async
    #log4j.appender.KAFKA_HDFS_AUDIT.BatchSize=1
    #log4j.appender.KAFKA_HDFS_AUDIT.QueueSize=1
    

    HDFS LOG4J Configuration

  • Step 3: Edit $HADOOP_CONF_DIR/hadoop-env.sh, and add the reference to KAFKA_HDFS_AUDIT to HADOOP_NAMENODE_OPTS.

    -Dhdfs.audit.logger=INFO,DRFAAUDIT,KAFKA_HDFS_AUDIT
    

    HDFS Environment Configuration

  • Step 4: Edit $HADOOP_CONF_DIR/hadoop-env.sh, and append the following command to it.

    export HADOOP_CLASSPATH=${HADOOP_CLASSPATH}:/path/to/eagle/lib/log4jkafka/lib/*
    

    HDFS Environment Configuration

  • Step 5: save the changes and restart the namenode.

  • Step 6: Check whether logs are flowing into Topic sandbox_hdfs_audit_log

    $ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic sandbox_hdfs_audit_log
    

Footnotes

  1. All mentions of “kafka” on this page represent Apache Kafka.

  2. all mentions of “ambari” on this page represent Apache Ambari.

Copyright © 2015 The Apache Software Foundation, Licensed under the Apache License, Version 2.0.
Apache Eagle, Eagle, Apache Hadoop, Hadoop, Apache HBase, HBase, Apache Hive, Hive, Apache Ambari, Ambari, Apache Spark, Spark, Apache Kafka, Kafka, Apache Storm, Storm, Apache Maven, Maven, Apache Tomcat, Tomcat, Apache Derby, Derby, Apache Cassandra, Cassandra, Apache ZooKeeper, ZooKeeper, Apache, the Apache feather logo, and the Apache project logo are trademarks of The Apache Software Foundation.