Friday, June 13, 2014

I have mentioned about centralized logging system with an example of Logstash in previous post:
centralized logging with Logstash.

It's greate if you do not want to do any programming work since Logstash provides you lots of flexibility. However, there are quite some limitations if you use logstash for a long time:

  • System log for logstash is not sufficient if error happens
  • Too complicated system, any error in any part could result in data failure
  • With large input of stream say 2000 per input, logstash could crash without any notifications.
While I explore through the streaming solutions across the internet, Flume becomes a potential better solutions.The idea of Flume is more or less the same as Logstash, except itself provides a reliable queue system. Supported by Apache, Flume seems to be more reliable, and it dose not have to be very complicated system. The following tutorial is an introduction to use flume as centralized logging system, based on ubuntu system.

Flume Installation


Install flume-1.5.0 latest version under /opt/flume:
cd ~
sudo apt-get update
sudo apt-get install openjdk-7-jre-headless -y
sudo wget http://mirror.nus.edu.sg/apache/flume/1.5.0/apache-flume-1.5.0-bin.tar.gz
tar -zvxf apache-flume-1.5.0-bin.tar.gz
sudo mv apache-flume-1.5.0-bin /opt/flume
If you want to use elasticsearch sink, you will have to install lucence library. Suppose you are using elasticsearch 1.2.1 with lucence 4.8
sudo wget https://dl.dropboxusercontent.com/s/h491nkeajc67pk7/lucene%204.8.zip
unzip lucene\ 4.8.zip
cd lucene\ 4.8/
sudo mv * /opt/flume/lib
If you want to try out flume, please follow the tutorial: http://flume.apache.org/FlumeUserGuide.html#a-simple-example usually in order to run flume in a normal environment, you will have to configure flume-env.sh:
cd /opt/flume/conf
cp flume-env.sh.template flume-env.sh
#edit flume-env.sh to put your JAVA_HOME and a reasonable JAVA_OPTS(this is important, if you have a large streaming data per sec)

Elasticsearch Installation

Please follow my blog:http://jakege.blogspot.sg/2014/03/how-to-install-elasticsearch.html

Kibana Installation

Kibana you will just have to download from official site:http://www.elasticsearch.org/overview/kibana/, because it's simply a local website. All you have to do is to edit config.js to connect to the right elasticsearch servers and open index.html.

Centralized Logging System with Flume

Usually for centralized logging we use the consolidate flume standard setup:
Here we need two kinds of flume agent: shipping agent and collecting agent.

Shipping agent

For shipping agent, the agent will listen on log file and ship to arvo port of collection agent, the set-up is like:
################################################
# Name the components on this agent
################################################
 
agent1.sources = source1
agent1.sinks = sink1 sink2
agent1.channels = channel1
 
################################################
# Describe Source
################################################
 
# Source Tail
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F /var/log/nginx/nginx_access.log
 
################################################
# Describe Interceptors
################################################
 
agent1.sources.source1.interceptors = interceptor1 interceptor2
#add from host
agent1.sources.source1.interceptors.interceptor1.type = host
agent1.sources.source1.interceptors.interceptor1.hostHeader = host
#add timestamp
agent1.sources.source1.interceptors.interceptor2.type = timestamp
 
################################################
# Describe Sink
################################################
 
#Avro Sink, usually will have two collection point for load_balance and HA
agent1.sinks.sink1.type = avro 
agent1.sinks.sink1.hostname = 192.168.0.1
agent1.sinks.sink1.port = 5000
 
agent1.sinks.sink2.type = avro
agent1.sinks.sink2.hostname = 192.168.0.1
agent1.sinks.sink2.port = 5000
 
 
################################################
# Describe Sink Group
################################################
 
# Sink Group
agent1.sinkgroups = load_group1
agent1.sinkgroups.load_group1.sinks = sink1 sink2
agent1.sinkgroups.load_group1.processor.type = load_balance
agent1.sinkgroups.load_group1.processor.backoff = true
agent1.sinkgroups.load_group1.processor.selector = round_robin
 
################################################
# Describe Channel
################################################
 
# Channel Memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 100000
agent1.channels.channel1.transactionCapacity = 300
 
################################################
# Bind the source and sink to the channel
################################################
 
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
agent1.sinks.sink2.channel = channel1

Collecting agent

For collection agent, the agent will take the input from arvo port and put it into elasticsearch, the set-up is like:
################################################
# Name the components on this agent
################################################
 
agent2.sources = source1
agent2.sinks = sink1
agent2.channels = channel1
 
################################################
# Describe Source
################################################
 
# Source Avro
agent2.sources.source1.type = avro
agent2.sources.source1.bind = 0.0.0.0 
agent2.sources.source1.port = 5000
 
################################################
# Describe Interceptors
################################################
# an example of nginx access log regex match
agent2.sources.source1.interceptors = interceptor1
agent2.sources.source1.interceptors.interceptor1.type = regex_extractor
agent2.sources.source1.interceptors.interceptor1.regex = ^(.*) ([a-zA-Z\\.\\@\\-\\+_%]+) ([a-zA-Z\\.\\@\\-\\+_%]+) \\[(.*)\\] \\"(POST|GET) ([A-Za-z0-9\\$\\.\\+\\@#%_\\/\\-]*)\\??(.*) (.*)\\" ([a-zA-Z0-9\\.\\/\\s\-]*) (.*) ([0-9]+) ([0-9]+) ([0-9\\.]+)
agent2.sources.source1.interceptors.interceptor1.serializers = s1 s2 s3 s4 s5 s6 s7 s8 s9 s10 s11 s12 s13
agent2.sources.source1.interceptors.interceptor1.serializers.s1.name = clientip
agent2.sources.source1.interceptors.interceptor1.serializers.s2.name = ident
agent2.sources.source1.interceptors.interceptor1.serializers.s3.name = auth
agent2.sources.source1.interceptors.interceptor1.serializers.s4.name = logtime
agent2.sources.source1.interceptors.interceptor1.serializers.s5.name = method
agent2.sources.source1.interceptors.interceptor1.serializers.s6.name = request
agent2.sources.source1.interceptors.interceptor1.serializers.s7.name = param
agent2.sources.source1.interceptors.interceptor1.serializers.s8.name = httpversion
agent2.sources.source1.interceptors.interceptor1.serializers.s9.name = referrer
agent2.sources.source1.interceptors.interceptor1.serializers.s10.name = agent
agent2.sources.source1.interceptors.interceptor1.serializers.s11.name = response
agent2.sources.source1.interceptors.interceptor1.serializers.s12.name = bytes
agent2.sources.source1.interceptors.interceptor1.serializers.s13.name = requesttime
 
 
################################################
# Describe Sink
################################################
 
# Sink ElasticSearch
# Elasticsearch lib ---> flume/lib
# elasticsearch/config/elasticsearch.yml cluster.name clusterName. data/clustername data.
agent2.sinks.sink1.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
agent2.sinks.sink1.hostNames = 192.168.1.1:9300,192.168.1.2:9300
agent2.sinks.sink1.indexName = nginx
agent2.sinks.sink1.indexType = nginx_access
agent2.sinks.sink1.clusterName = elasticsearch
agent2.sinks.sink1.batchSize = 1000
agent2.sinks.sink1.ttl = 2
#this serializer is crucial in order to use kibana
agent2.sinks.sink1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer
 
 
 
################################################
# Describe Channel
################################################
 
# Channel Memory
agent2.channels.channel1.type = memory
agent2.channels.channel1.capacity = 10000000
agent2.channels.channel1.transactionCapacity = 1000
 
################################################
# Bind the source and sink to the channel
################################################
 
agent2.sources.source1.channels = channel1
agent2.sinks.sink1.channel = channel1

Start-up script

if you need an start-up service script: assume your agent is agent1 and config file is flume.conf in /opt/flume. please change it respectively
#!/bin/sh

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Starts a Flume agent
#
# chkconfig: 345 90 10
# description: Flume agent
#
### BEGIN INIT INFO
# Provides:          flume-ng-agent
# Required-Start:    $remote_fs
# Should-Start:
# Required-Stop:     $remote_fs
# Should-Stop:
# Default-Start:     3 4 5
# Default-Stop:      0 1 2 6
# Short-Description: Flume agent
### END INIT INFO
 
. /lib/lsb/init-functions
 
# Name of the agnet
FLUME_AGENT_NAME=agent1

# Setting up a few defaults that can be later overrideen in /etc/default/flume-ng-agent
FLUME_LOG_DIR=/opt/flume/logs
FLUME_CONF_DIR=/opt/flume/conf
FLUME_RUN_DIR=/var/run/flume
FLUME_HOME=./bin/flume-ng
FLUME_USER=mozat
 
# Autodetect JAVA_HOME if not defined
if [ -e /usr/libexec/bigtop-detect-javahome ]; then
  . /usr/libexec/bigtop-detect-javahome
elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
  . /usr/lib/bigtop-utils/bigtop-detect-javahome
fi
 
STATUS_RUNNING=0
STATUS_DEAD=1
STATUS_DEAD_AND_LOCK=2
STATUS_NOT_RUNNING=3
 
ERROR_PROGRAM_NOT_INSTALLED=5
 
FLUME_LOCK_DIR="/var/lock/subsys/"
LOCKFILE="${FLUME_LOCK_DIR}/flume-ng-agent"
desc="Flume agent daemon"
 
FLUME_CONF_FILE=${FLUME_CONF_FILE:-${FLUME_CONF_DIR}/flume.conf}
EXEC_PATH=/opt/flume/bin/flume-ng
FLUME_PID_FILE=${FLUME_RUN_DIR}/flume.pid
 
# These directories may be tmpfs and may or may not exist
# depending on the OS (ex: /var/lock/subsys does not exist on debian/ubuntu)
for dir in "$FLUME_RUN_DIR" "$FLUME_LOCK_DIR"; do
  [ -d "${dir}" ] || install -d -m 0755 -o $FLUME_USER -g $FLUME_USER ${dir}
done
 
FLUME_SHUTDOWN_TIMEOUT=${FLUME_SHUTDOWN_TIMEOUT:-60}
 
start() {
  [ -x $exec ] || exit $ERROR_PROGRAM_NOT_INSTALLED
 
  checkstatus
  status=$?
  if [ "$status" -eq "$STATUS_RUNNING" ]; then
    exit 0
  fi
 
  log_success_msg "Starting $desc (flume-ng-agent): "
  /bin/su -s /bin/bash -c "cd /opt/flume;/bin/bash -c 'echo \$\$ > ${FLUME_PID_FILE} && exec ${EXEC_PATH} agent -n $FLUME_AGENT_NAME -c conf -f $FLUME_CONF_FILE -Dflume.monitoring.type=http -Dflume.monitoring.port=30001 >>${FLUME_LOG_DIR}/flume.${FLUME_AGENT_NAME}.init.log 2>&1 ' &" $FLUME_USER
  RETVAL=$?
  [ $RETVAL -eq 0 ] && touch $LOCKFILE
  return $RETVAL
}
 
stop() {
  if [ ! -e $FLUME_PID_FILE ]; then
    log_failure_msg "Flume agent is not running"
    exit 0
  fi
 
  log_success_msg "Stopping $desc (flume-ng-agent): "
 
  FLUME_PID=`cat $FLUME_PID_FILE`
  if [ -n $FLUME_PID ]; then
    log_success_msg "kill process ${FLUME_PID}"
    kill -TERM ${FLUME_PID} &>/dev/null
#    for i in `seq 1 ${FLUME_SHUTDOWN_TIMEOUT}` ; do
#      kill -0 ${FLUME_PID} &>/dev/null || break
#      sleep 1
#    done
    kill -KILL ${FLUME_PID} &>/dev/null
  fi
  rm -f $LOCKFILE $FLUME_PID_FILE
  return 0
}
 
restart() {
  stop
  start
}
 
checkstatus(){
  pidofproc -p $FLUME_PID_FILE java > /dev/null
  status=$?
 
  case "$status" in
    $STATUS_RUNNING)
      log_success_msg "Flume agent is running"
      ;;
    $STATUS_DEAD)
      log_failure_msg "Flume agent is dead and pid file exists"
      ;;
    $STATUS_DEAD_AND_LOCK)
      log_failure_msg "Flume agent is dead and lock file exists"
      ;;
    $STATUS_NOT_RUNNING)
      log_failure_msg "Flume agent is not running"
      ;;
    *)
      log_failure_msg "Flume agent status is unknown"
      ;;
  esac
  return $status
}
 
condrestart(){
  [ -e ${LOCKFILE} ] && restart || :
}
 
case "$1" in
  start)
    start
    ;;
  stop)
    stop
    ;;
  status)
    checkstatus
    ;;
  restart)
    restart
    ;;
  condrestart|try-restart)
    condrestart
    ;;
  *)
    echo $"Usage: $0 {start|stop|status|restart|try-restart|condrestart}"
    exit 1
esac
 
exit $RETVAL

Index template

For logs, in order to manage it better and fits better in Kibana, we better define a index template, for example for our nginx log:
{
    "template": "nginx-*",
    "settings" : {
        "number_of_shards" : 5,
        "number_of_replicas" : 1,
        "index.cache.field.type" : "soft",
        "index.refresh_interval" : "5s",
        "index" : {
            "query" : { "default_field" : "@message" },
            "store" : { "compress" : { "stored" : true, "tv": true } }
        }
    },
    "mappings": {
        "_default_": {
            "_all": { "enabled": false },
            "_source": { "compress": true },
            "_ttl": { "enabled": true, "default": "2d" },
             "properties" : {
                "@timestamp": { "type": "date", "index": "not_analyzed" },
                "@message": { "type" : "string", "index" : "analyzed" },
  "@source_host": { "type": "string", "index": "not_analyzed" },
  "@fields" : {
   "type": "object",
         "properties": {
           "agent" : { "type" : "string", "index" : "analyzed" },
                  "request" : { "type" : "string", "index" : "not_analyzed" },
                  "host" : { "type" : "string", "index" : "not_analyzed" },
                  "clientip" : { "type" : "string", "index" : "not_analyzed" },
                  "file" : { "type" : "string", "index" : "not_analyzed" },
                  "bytes": { "type": "integer"},
                  "offset": {"type": "integer"},
                  "requesttime": {"type": "float"},
    "logtime": { "type" : "string", "index" : "not_analyzed" }
         }
  }
     }
        }
    }
}

Elasticsearch Curator

To manage our index, delete and close outdated index and optimize it. We use elasticsearch curator, installation for 1.0:
pip install elasticsearch-curator
To close index older than 1 day and delete 4 days with "-" as separator(flume uses - as index date separator like: nginx-2014-06-13), we put this in cron job:
20 12 * * * /usr/local/bin/curator --host 192.168.1.1 -s - --prefix nginx- -d 4 -c 1 -s -

Cloudera management tool for flume

To manage flume, you can use cloudera management tool. Finally, you will be able to see the results with customized kibana setting:

6 comments:

  1. Hi,

    Thanks for this informative article. I am having issues setting up flume with elasticsearch and kibana.
    I am able to save the data using Flume into HDFS by using HDFS sink. Now the same data I want to send to elasticsearch so I can visualize in Kibana. 
    But unfortunately, it does not seem to work.


    The data I want to send looks like this:
    {‘id’: ‘26’, ‘value’: ‘8’}
    {‘id’: ‘27’, ‘value’: ‘16’}
    {‘id’: ‘28’, ‘value’: ‘21’}
    {‘id’: ‘29’, ‘value’: ‘10’}


    I have created an elasticsearch index with this mapping:
    curl -XPUT ‘localhost:9200/riz_index?pretty’ -H ‘Content-Type: application/json’ -d’
    {
     “mappings” : {
     “_default_” : {
     “properties” : {
     “id” : {“type”: “integer” },
     “value” : {“type”: “integer” }
     }
     }
     }
    }



    The flume conf file:


    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = 127.0.0.5
    a1.sources.r1.port = 5005
    # Describe the sink ES
    a1.sinks = k1
    a1.sinks.k1.type = elasticsearch
    a1.sinks.k1.hostNames = localhost:9200,localhost:9300
    a1.sinks.k1.indexName = riz_index
    a1.sinks.k1.indexType = item
    a1.sinks.k1.clusterName = elasticsearch
    a1.sinks.k1.batchSize = 500
    a1.sinks.k1.ttl = 5d
    a1.sinks.k1.serializer=org.apache.flume.sink.elasticsearch.v12.ElasticSearchLogStashEventSerializer
    a1.sinks.k1.channel = c1
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1


    I can see the elasticsearch index created in kibana and the specified fields (id and value) but no data in those fields.
    Is there anything that I am missing?


    Thanks

    ReplyDelete
  2. Nice and helpful post for everyone. i learnt more useful information about hadoop from this article..

    best big data training center in Chennai

    ReplyDelete