====== LBActive ======
**For the latest version go to the [[https://sourceforge.net/projects/lbactive/?source=navbar|LBActive Project at Sourceforge]]
**
LBActive is an dynamic and active load balancer health check system written in Python. Currently it supports only Nginx.
The next release (if there is one) will move to a more modular approach to enable support for other load balancers that do not have dynamic and/or active checks and other types of backend checks if needed.
* /opt/sbin/lbactive
#!/usr/bin/env python
import argparse
import ConfigParser
import logging
import logging.handlers
import multiprocessing
import os
import pprint
import re
import requests
import shutil
import signal
import socket
import string
import tempfile
import time
from daemon import runner
from subprocess import call
HOST = "localhost"
PORT = 3333
VERSION = '0.2 beta'
# Settings
if os.path.isfile('/etc/lbactive/lbactive.cfg'):
config = ConfigParser.RawConfigParser()
config.readfp(open('/etc/lbactive/lbactive.cfg'))
else:
print "/etc/lbactive/lbactive.cfg not found! Exiting!!"
exit()
if config.has_option("main", "logfile"):
LOG_FILENAME = config.get("main", "logfile")
else:
LOG_FILENAME = "/var/log/lbactive"
if config.has_option("main", "pidfile"):
PID_FILE = config.get("main", "pidfile")
else:
PID_FILE = '/var/run/lbactive.pid'
if config.has_option("main", "loglevel"):
LOG_LEVEL = config.get("main", "loglevel")
else:
LOG_LEVEL = "INFO"
if config.has_option("main", "checkinterval"):
CHECK_INTERVAL = config.get("main", "checkinterval")
else:
CHECK_INTERVAL = 5
if config.has_option("main", "updateinterval"):
UPDATE_INTERVAL = config.get("main", "updateinterval")
else:
UPDATE_INTERVAL = 2
if config.has_option("main", "loaddiff"):
LOAD_DIFF = config.get("main", "loaddiff")
else:
LOAD_DIFF = 2
if config.has_option("main", "nginx_location"):
NGINX = config.get("main", "nginx_location")
else:
NGINX = "/usr/sbin/nginx"
TMP_DIR = tempfile.gettempdir()
LOG_LEVEL_NUMERIC = getattr(logging, LOG_LEVEL.upper(), None)
if not isinstance(LOG_LEVEL_NUMERIC, int):
raise ValueError('Invalid log level: $s' % LOG_LEVEL)
logger = logging.getLogger('lbactive')
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.basicConfig(level=LOG_LEVEL_NUMERIC, format='%(asctime)s %(name)s %(levelname)s: %(message)s', filename=LOG_FILENAME)
clusters = {}
current = {}
L = list()
for section in config.sections():
if section != "main":
current['config_dir'] = config.get(section, "config_dir")
logger.info("Loading Nginx Upstream Config: " + current["config_dir"] + "/" + section + ".conf")
if config.has_option(section, "clustertype"):
current['cluster_type'] = config.get(section, "clustertype")
if config.has_option(section, "port"):
current['port'] = config.get(section, "port")
else:
current['port'] = PORT
if config.has_option(section, "loaddiff"):
current["load_diff"] = config.get(section, "loaddiff")
else:
current["load_diff"] = LOAD_DIFF
if config.has_option(section, "updateinterval"):
current["update_interval"] = config.get(section, "updateinterval")
else:
current["update_interval"] = UPDATE_INTERVAL
logger.debug("Health Check Port: " + upstreams[(section, "port")])
current['last_update'] = 0
current['update'] = False
current['forced_update'] = False
current['lb_type'] = ""
if os.path.isfile(current["config_dir"] + "/" + section + ".conf"):
with open(current["config_dir"] + "/" + section + ".conf") as upstream:
for cfg_line in upstream:
xtra_options = ''
cfg_line = cfg_line.strip()
cfg_line = re.sub(';$', '', cfg_line)
first_word = cfg_line.partition(' ')[0]
if first_word == "server":
cfg_line = re.sub('^server ', '', cfg_line)
for word in cfg_line.split(" "):
if "weight=" not in word and "down" not in word and cfg_line.partition(' ')[0] != word:
xtra_options = " ".join([xtra_options, word])
L.append({'ip':cfg_line.partition(' ')[0],'last_check':False,'last_idle':0,'last_update_action':False, 'xtra_options':xtra_options})
elif first_word == "ip_hash" or first_word == "least_conn" or first_word == "round_robin":
current["lb_type"] = first_word
elif first_word == "hash":
current['lb_type'] = cfg_line
current["nodes"] = L
clusters[section] = current
current = {}
L = []
else:
logger.warning("NginX config file not found: " + current["config_dir"] + "/" + section + ".conf")
elif section == "main":
zed = 0
else:
logger.critical("No clusters configured! Exiting.")
print "No clusters configured! Exiting."
exit
class Daemon(object):
def __init__(self, pidfile_path):
self.stdin_path = '/dev/null'
self.stdout_path = '/dev/null'
self.stderr_path = '/dev/null'
self.pidfile_path = None
self.pidfile_timeout = 5
self.pidfile_path = PID_FILE
def setup_daemon_context(self, daemon_context):
self.daemon_context = daemon_context
def run(self):
logger.info('lbactive service has started')
logger.debug('event from the run() = {}'.format(self.daemon_context.stop_event))
while not self.daemon_context.stop_event.wait(float(CHECK_INTERVAL)):
try:
write_configs = False
reload_service = False
for cluster_name, cluster in clusters.items():
#print cluster_name
for node in cluster['nodes']:
logger.debug("Checking: " + node['ip'] + ":" + cluster["port"])
node['last_check'] = int(time.time()) / 60
try:
sock = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
logger.debug("connecting to: " + node['ip'] + ':' + cluster["port"])
sock.connect((node['ip'], int(cluster["port"])))
data = sock.recv(1024)
data = re.sub('%$', '', data)
data = int(round(int(data) / 10))
if data == 0:
data = 1
if node['last_update_action'] != False:
cluster['forced_update'] = True
node['last_update_action'] = False
node['current_idle'] = data
node['update_action'] = False
logger.debug(node['ip'] + ': ' + str(data))
except:
if node['last_update_action'] != 'down':
cluster['forced_update'] = True
node['last_update_action'] = 'down'
node['current_idle'] = False
node["update_action"] = "down"
logger.debug("Unable to connect to: " + node['ip'])
logger.debug("Setting node: " + node['ip'] + " to down")
try:
r = requests.head("http://" + node['ip'])
r.raise_for_status()
except (requests.exceptions.HTTPError, requests.exceptions.ConnectionError, requests.exceptions.Timeout):
if node['last_update_action'] != 'down':
cluster['forced_update'] = True
node['last_update_action'] = 'down'
node['current_idle'] = False
node["update_action"] = "down"
logger.debug("Unable to retrieve HTTP response from: " + node['ip'])
logger.debug("Setting node: " + node['ip'] + " to down")
else:
if node['last_update_action'] != False or node['update_action'] != False:
cluster['forced_update'] = True
node['last_update_action'] = False
node['update_action'] = False
logger.debug(node['ip'] + ' status code: ' + str(r.status_code))
if cluster['lb_type'] == "ip_hash" or cluster['lb_type'] == "least_conn":
cluster['lb_type'] = "\n " + cluster['lb_type'] + ";"
cluster['config'] = " upstream " + cluster_name + " {" + cluster['lb_type']
for node in cluster['nodes']:
if node['update_action'] == 'down' :
cluster['config'] = '\n'.join([cluster['config'], " server " + node['ip'] + ' down;'])
else:
if abs(int(node['current_idle'])-int(node['last_idle'])) >= int(cluster['load_diff']):
cluster['update'] = True
node['last_idle'] = node['current_idle']
cluster['config'] = '\n'.join([cluster['config'], " server " + node['ip'] + ' weight=' + str(node['last_idle']) + node['xtra_options'] + ';'])
cluster['config'] = '\n'.join([cluster['config'], " }\n"])
now = int(time.time()) / 60
if cluster['forced_update'] or (now - int(cluster['last_update']) >= int(cluster['update_interval']) and cluster['update']):
#print cluster['config']
logger.info("Updating cluster: " + cluster_name)
cluster['last_update'] = now
cluster['update'] = False
cluster['forced_update'] = False
write_configs = True
else:
logger.debug("No update needed")
if write_configs:
logger.info("writing configs")
config_file = open(TMP_DIR + "/" + cluster_name + ".conf", "w")
config_file.write(cluster['config'])
config_file.close()
shutil.copy(TMP_DIR + "/" + cluster_name + ".conf", cluster["config_dir"])
reload_service = True
write_configs = False
if reload_service:
logger.info("Checking Nginx Configuration")
if call([NGINX, '-tq']) == 0:
logger.info("Reloading Nginx")
call([NGINX, '-s', 'reload'])
reload_service = False
#pp = pprint.PrettyPrinter(indent=4)
#pp.pprint(clusters)
except BaseException as exc:
logger.exception(exc)
logger.info('lbactive service has been stopped')
def handle_exit(self, signum, frame):
try:
logger.info('lbactive stopping...')
self.daemon_context.stop_event.set()
except BaseException as exc:
logger.exception(exc)
if __name__ == '__main__':
app = Daemon(PID_FILE)
d = runner.DaemonRunner(app)
#d.daemon_context.working_directory = lbactive_cfg.work_dir
d.daemon_context.files_preserve = [h.stream for h in logging.root.handlers]
d.daemon_context.signal_map = {signal.SIGUSR1: app.handle_exit}
d.daemon_context.stop_event = multiprocessing.Event()
app.setup_daemon_context(d.daemon_context)
#logger.debug('event from the main = {}'.format(d.daemon_context.stop_event))
d.do_action()
* /etc/init.d/lbactive
### BEGIN INIT INFO
# Provides: lbactive
# Required-Start: $remote_fs $syslog nginx
# Required-Stop: $remote_fs $syslog
# Default-Start: 2 3 4 5
# Default-Stop: 0 1 6
# Short-Description: Load Balancer Active Checks for NginX
# Description: Enables active health and connection checks for NginX
### END INIT INFO
# Using the lsb functions to perform the operations.
. /lib/lsb/init-functions
# Process name ( For display )
NAME=lbactive
# Daemon name, where is the actual executable
DAEMON=/opt/sbin/lbactive
# pid file for the daemon
PIDFILE=/var/run/lbactive.pid
# If the daemon is not there, then exit.
test -x $DAEMON || exit 5
case $1 in
start)
# Checked the PID file exists and check the actual status of process
if [ -e $PIDFILE ]; then
status_of_proc -p $PIDFILE $DAEMON "$NAME process" && status="0" || status="$?"
# If the status is SUCCESS then don't need to start again.
if [ $status = "0" ]; then
exit # Exit
fi
fi
# Start the daemon.
log_daemon_msg "Starting the process" "$NAME"
# Start the daemon with the help of start-stop-daemon
# Log the message appropriately
if $DAEMON start; then
log_end_msg 0
else
log_end_msg 1
fi
;;
stop)
# Stop the daemon.
if [ -e $PIDFILE ]; then
status_of_proc -p $PIDFILE $DAEMON "Stoppping the $NAME process" && status="0" || status="$?"
if [ "$status" = 0 ]; then
$DAEMON stop
/bin/rm -rf $PIDFILE
fi
else
log_daemon_msg "$NAME process is not running"
log_end_msg 0
fi
;;
restart)
# Restart the daemon.
$0 stop && sleep 2 && $0 start
;;
status)
# Check the status of the process.
if [ -e $PIDFILE ]; then
status_of_proc -p $PIDFILE $DAEMON "$NAME process" && exit 0 || exit $?
else
log_daemon_msg "$NAME Process is not running"
log_end_msg 0
fi
;;
reload)
# Reload the process. Basically sending some signal to a daemon to reload
# it configurations.
if [ -e $PIDFILE ]; then
start-stop-daemon --stop --signal USR1 --quiet --pidfile $PIDFILE --name $NAME
log_success_msg "$NAME process reloaded successfully"
else
log_failure_msg "$PIDFILE does not exists"
fi
;;
*)
# For invalid arguments, print the usage message.
echo "Usage: $0 {start|stop|restart|reload|status}"
exit 2
;;
esac
* /etc/systemd/system/lbactive.service
[Unit]
Description=Load Balancing Active Checks
[Service]
ExecStart=/opt/sbin/lbactive start
ExecStop=/opt/sbin/lbactive stop
[Install]
WantedBy=multi-user.target
* /etc/lbactive/lbactive.cfg
[main]
# Path to PID file
pidfile = /run/lbactive.pid
# Path to log file
logfile = /var/log/lbactive
# debug, info, warn, error or critical
loglevel = info
# Check Interval in seconds (default = 5 seconds)
checkinterval = 5
# Update Interval. Minimum time (in minutes) before config is updated if needed
# Ignored if a node is registered as being down
# Can be overriden on a per Cluster basis
# (default = 2 minutes)
updateinterval = 2
# Idle load difference before change is needed
# Values from 1 - 10 allowed.
# A value of 1 is the equivelant of 10% idle time on the node
# So a value of 4 = 4 x 10 = 40% idle difference.
# Can be overriden on a per Cluster basis
# (default = 4)
loaddiff = 4
nginx_location = /usr/sbin/nginx
[upstream1]
clustertype = nginx
config_dir = /etc/nginx/upstreams-enabled/
port = 3333
loaddiff = 4
updateinterval = 1
#[upstream2]
#clustertype = nginx
#config_dir = /etc/nginx/upstreams-enabled/
#port = 3333
#loaddiff = 4
#updateinterval = 2
* /opt/bin/lb-feedback
#!/bin/bash
LOAD=`/usr/bin/vmstat 5 2| /usr/bin/tail -1| /usr/bin/awk '{print $15;}' | /usr/bin/tee`
echo "$LOAD%"