LBActive

For the latest version go to the LBActive Project at Sourceforge

LBActive is an dynamic and active load balancer health check system written in Python. Currently it supports only Nginx.

The next release (if there is one) will move to a more modular approach to enable support for other load balancers that do not have dynamic and/or active checks and other types of backend checks if needed.

#!/usr/bin/env python

import argparse
import ConfigParser
import logging
import logging.handlers
import multiprocessing
import os
import pprint
import re
import requests
import shutil
import signal
import socket
import string
import tempfile
import time
from daemon import runner
from subprocess import call

HOST = "localhost"
PORT = 3333
VERSION = '0.2 beta'

# Settings
if os.path.isfile('/etc/lbactive/lbactive.cfg'):
	config = ConfigParser.RawConfigParser()
	config.readfp(open('/etc/lbactive/lbactive.cfg'))
else:
	print "/etc/lbactive/lbactive.cfg not found! Exiting!!"
	exit()

if config.has_option("main", "logfile"):
	LOG_FILENAME = config.get("main", "logfile")
else:
	LOG_FILENAME = "/var/log/lbactive"
if config.has_option("main", "pidfile"):
	PID_FILE = config.get("main", "pidfile")
else:
	PID_FILE = '/var/run/lbactive.pid'
if config.has_option("main", "loglevel"):
	LOG_LEVEL = config.get("main", "loglevel")
else:
	LOG_LEVEL = "INFO"
if config.has_option("main", "checkinterval"):
	CHECK_INTERVAL = config.get("main", "checkinterval")
else:
	CHECK_INTERVAL = 5
if config.has_option("main", "updateinterval"):
	UPDATE_INTERVAL = config.get("main", "updateinterval")
else:
	UPDATE_INTERVAL = 2
if config.has_option("main", "loaddiff"):
	LOAD_DIFF = config.get("main", "loaddiff")
else:
	LOAD_DIFF = 2
if config.has_option("main", "nginx_location"):
	NGINX = config.get("main", "nginx_location")
else:
	NGINX = "/usr/sbin/nginx"
TMP_DIR = tempfile.gettempdir()

LOG_LEVEL_NUMERIC = getattr(logging, LOG_LEVEL.upper(), None)
if not isinstance(LOG_LEVEL_NUMERIC, int):
	raise ValueError('Invalid log level: $s' % LOG_LEVEL)
	
logger = logging.getLogger('lbactive')
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.basicConfig(level=LOG_LEVEL_NUMERIC, format='%(asctime)s %(name)s %(levelname)s: %(message)s', filename=LOG_FILENAME)

clusters = {}
current = {}
L = list()

for section in config.sections():
	if section != "main":
		current['config_dir'] = config.get(section, "config_dir")
		logger.info("Loading Nginx Upstream Config: " + current["config_dir"] + "/" + section + ".conf")
		if config.has_option(section, "clustertype"):
			current['cluster_type'] = config.get(section, "clustertype")
		if config.has_option(section, "port"):
			current['port'] = config.get(section, "port")
		else:
			current['port'] = PORT
		if config.has_option(section, "loaddiff"):
			current["load_diff"] = config.get(section, "loaddiff")
		else:
			current["load_diff"] = LOAD_DIFF
		if config.has_option(section, "updateinterval"):
			current["update_interval"] = config.get(section, "updateinterval")
		else:
			current["update_interval"] = UPDATE_INTERVAL
			logger.debug("Health Check Port: " + upstreams[(section, "port")])
		current['last_update'] = 0
		current['update'] = False
		current['forced_update'] = False
		current['lb_type'] = ""
		if os.path.isfile(current["config_dir"] + "/" + section + ".conf"):
			with open(current["config_dir"] + "/" + section + ".conf") as upstream:
				for cfg_line in upstream:
					xtra_options = ''
					cfg_line = cfg_line.strip()
					cfg_line = re.sub(';$', '', cfg_line)
					first_word = cfg_line.partition(' ')[0]
					if first_word == "server":
						cfg_line = re.sub('^server ', '', cfg_line)
						for word in cfg_line.split(" "):
							if "weight=" not in word and "down" not in word and cfg_line.partition(' ')[0] != word:
								xtra_options = " ".join([xtra_options, word])
						L.append({'ip':cfg_line.partition(' ')[0],'last_check':False,'last_idle':0,'last_update_action':False, 'xtra_options':xtra_options})
					elif first_word == "ip_hash" or first_word == "least_conn" or first_word == "round_robin":
						current["lb_type"] = first_word
					elif first_word == "hash":
						current['lb_type'] = cfg_line
				current["nodes"] = L
				clusters[section] = current
				current = {}
				L = []
		else:
			logger.warning("NginX config file not found: " + current["config_dir"] + "/" + section + ".conf")
	elif section == "main":
		zed = 0
	else:
		logger.critical("No clusters configured! Exiting.")
		print "No clusters configured! Exiting."
		exit

class Daemon(object):     
	def __init__(self, pidfile_path):
		self.stdin_path = '/dev/null'
		self.stdout_path = '/dev/null'
		self.stderr_path = '/dev/null'
		self.pidfile_path = None
		self.pidfile_timeout = 5
		self.pidfile_path = PID_FILE 

	def setup_daemon_context(self, daemon_context):
		self.daemon_context = daemon_context

	def run(self):
		logger.info('lbactive service has started')
		logger.debug('event from the run() = {}'.format(self.daemon_context.stop_event))

		while not self.daemon_context.stop_event.wait(float(CHECK_INTERVAL)):
			try:
				write_configs = False
				reload_service = False
				for cluster_name, cluster in clusters.items():
					#print cluster_name
					for node in cluster['nodes']:
						logger.debug("Checking: " + node['ip'] + ":" + cluster["port"])
						node['last_check'] = int(time.time()) / 60
						try:
							sock = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
							logger.debug("connecting to: " + node['ip'] + ':' + cluster["port"])
							sock.connect((node['ip'], int(cluster["port"])))

							data = sock.recv(1024)
							data = re.sub('%$', '', data)
							data = int(round(int(data) / 10))
							if data == 0:
								data = 1

							if node['last_update_action'] != False:
								cluster['forced_update'] = True
								node['last_update_action'] = False
							node['current_idle'] = data
							node['update_action'] = False
							logger.debug(node['ip'] + ': ' + str(data))
						except:
							if node['last_update_action'] != 'down':
								cluster['forced_update'] = True
								node['last_update_action'] = 'down'
							node['current_idle'] = False
							node["update_action"] = "down"
							logger.debug("Unable to connect to: " + node['ip'])
							logger.debug("Setting node: " + node['ip'] + " to down")

						try:
							r = requests.head("http://" + node['ip'])
							r.raise_for_status()
						except (requests.exceptions.HTTPError, requests.exceptions.ConnectionError, requests.exceptions.Timeout):
							if node['last_update_action'] != 'down':
								cluster['forced_update'] = True
								node['last_update_action'] = 'down'
							node['current_idle'] = False
							node["update_action"] = "down"
							logger.debug("Unable to retrieve HTTP response from: " + node['ip'])
							logger.debug("Setting node: " + node['ip'] + " to down")
						else:
							if node['last_update_action'] != False or node['update_action'] != False:
								cluster['forced_update'] = True
								node['last_update_action'] = False
								node['update_action'] = False
							logger.debug(node['ip'] + ' status code: ' + str(r.status_code))


					if cluster['lb_type'] == "ip_hash" or cluster['lb_type'] == "least_conn":
						cluster['lb_type'] = "\n                " + cluster['lb_type'] + ";"
					cluster['config'] = "	upstream " + cluster_name + " {" + cluster['lb_type']
					for node in cluster['nodes']:
						if node['update_action'] == 'down' :
							cluster['config'] = '\n'.join([cluster['config'], "		server " + node['ip'] + ' down;'])
						else:
							if abs(int(node['current_idle'])-int(node['last_idle'])) >= int(cluster['load_diff']):
								cluster['update'] = True
								node['last_idle'] = node['current_idle']
							cluster['config'] = '\n'.join([cluster['config'], "		server " + node['ip'] + ' weight=' + str(node['last_idle']) + node['xtra_options'] + ';'])
					cluster['config'] = '\n'.join([cluster['config'], "	}\n"])

					now = int(time.time()) / 60
					if cluster['forced_update'] or (now - int(cluster['last_update']) >= int(cluster['update_interval']) and cluster['update']):
						#print cluster['config']
						logger.info("Updating cluster: " + cluster_name)
						cluster['last_update'] = now
						cluster['update'] = False
						cluster['forced_update'] = False
						write_configs = True
					else:
						logger.debug("No update needed")

					if write_configs:
						logger.info("writing configs")
						config_file = open(TMP_DIR + "/" + cluster_name + ".conf", "w")
						config_file.write(cluster['config'])
						config_file.close()
						shutil.copy(TMP_DIR + "/" + cluster_name + ".conf", cluster["config_dir"])
						reload_service = True
						write_configs = False

				if reload_service:
					logger.info("Checking Nginx Configuration")
					if call([NGINX, '-tq']) == 0:
						logger.info("Reloading Nginx")
						call([NGINX, '-s', 'reload'])
					reload_service = False

				#pp = pprint.PrettyPrinter(indent=4)
				#pp.pprint(clusters)
			except BaseException as exc:
				logger.exception(exc)
		logger.info('lbactive service has been stopped') 

	def handle_exit(self, signum, frame):
		try:
			logger.info('lbactive stopping...')
			self.daemon_context.stop_event.set()
		except BaseException as exc:
			logger.exception(exc)

if __name__ == '__main__':
    app = Daemon(PID_FILE)
    d = runner.DaemonRunner(app)
    #d.daemon_context.working_directory = lbactive_cfg.work_dir
    d.daemon_context.files_preserve = [h.stream for h in logging.root.handlers]
    d.daemon_context.signal_map = {signal.SIGUSR1: app.handle_exit} 
    d.daemon_context.stop_event = multiprocessing.Event()
    app.setup_daemon_context(d.daemon_context)
    #logger.debug('event from the main = {}'.format(d.daemon_context.stop_event))
    d.do_action()
### BEGIN INIT INFO
# Provides:          lbactive
# Required-Start:    $remote_fs $syslog nginx
# Required-Stop:     $remote_fs $syslog
# Default-Start:     2 3 4 5
# Default-Stop:      0 1 6
# Short-Description: Load Balancer Active Checks for NginX
# Description:       Enables active health and connection checks for NginX
### END INIT INFO

# Using the lsb functions to perform the operations.
. /lib/lsb/init-functions
# Process name ( For display )
NAME=lbactive
# Daemon name, where is the actual executable
DAEMON=/opt/sbin/lbactive
# pid file for the daemon
PIDFILE=/var/run/lbactive.pid

# If the daemon is not there, then exit.
test -x $DAEMON || exit 5

case $1 in
 start)
  # Checked the PID file exists and check the actual status of process
  if [ -e $PIDFILE ]; then
   status_of_proc -p $PIDFILE $DAEMON "$NAME process" && status="0" || status="$?"
   # If the status is SUCCESS then don't need to start again.
   if [ $status = "0" ]; then
    exit # Exit
   fi
  fi
  # Start the daemon.
  log_daemon_msg "Starting the process" "$NAME"
  # Start the daemon with the help of start-stop-daemon
  # Log the message appropriately
  if $DAEMON start; then
   log_end_msg 0
  else
   log_end_msg 1
  fi
  ;;
 stop)
  # Stop the daemon.
  if [ -e $PIDFILE ]; then
   status_of_proc -p $PIDFILE $DAEMON "Stoppping the $NAME process" && status="0" || status="$?"
   if [ "$status" = 0 ]; then
    $DAEMON stop
    /bin/rm -rf $PIDFILE
   fi
  else
   log_daemon_msg "$NAME process is not running"
   log_end_msg 0
  fi
  ;;
 restart)
  # Restart the daemon.
  $0 stop && sleep 2 && $0 start
  ;;
 status)
  # Check the status of the process.
  if [ -e $PIDFILE ]; then
   status_of_proc -p $PIDFILE $DAEMON "$NAME process" && exit 0 || exit $?
  else
   log_daemon_msg "$NAME Process is not running"
   log_end_msg 0
  fi
  ;;
 reload)
  # Reload the process. Basically sending some signal to a daemon to reload
  # it configurations.
  if [ -e $PIDFILE ]; then
   start-stop-daemon --stop --signal USR1 --quiet --pidfile $PIDFILE --name $NAME
   log_success_msg "$NAME process reloaded successfully"
  else
   log_failure_msg "$PIDFILE does not exists"
  fi
  ;;
 *)
  # For invalid arguments, print the usage message.
  echo "Usage: $0 {start|stop|restart|reload|status}"
  exit 2
  ;;
esac
[Unit]
Description=Load Balancing Active Checks

[Service]
ExecStart=/opt/sbin/lbactive start
ExecStop=/opt/sbin/lbactive stop

[Install]
WantedBy=multi-user.target
[main]

# Path to PID file
pidfile = /run/lbactive.pid

# Path to log file
logfile = /var/log/lbactive

# debug, info, warn, error or critical
loglevel = info

# Check Interval in seconds (default = 5 seconds)
checkinterval = 5

# Update Interval. Minimum time (in minutes) before config is updated if needed
# Ignored if a node is registered as being down
# Can be overriden on a per Cluster basis
# (default = 2 minutes)
updateinterval = 2

# Idle load difference before change is needed
# Values from 1 - 10 allowed.
# A value of 1 is the equivelant of 10% idle time on the node
# So a value of 4 = 4 x 10 = 40% idle difference.
# Can be overriden on a per Cluster basis
# (default = 4)
loaddiff = 4

nginx_location = /usr/sbin/nginx

[upstream1]
clustertype = nginx
config_dir = /etc/nginx/upstreams-enabled/
port = 3333
loaddiff = 4
updateinterval = 1

#[upstream2]
#clustertype = nginx
#config_dir = /etc/nginx/upstreams-enabled/
#port = 3333
#loaddiff = 4
#updateinterval = 2
#!/bin/bash
LOAD=`/usr/bin/vmstat 5 2| /usr/bin/tail -1| /usr/bin/awk '{print $15;}' | /usr/bin/tee`
echo "$LOAD%"