====== LBActive ====== **For the latest version go to the [[https://sourceforge.net/projects/lbactive/?source=navbar|LBActive Project at Sourceforge]] ** LBActive is an dynamic and active load balancer health check system written in Python. Currently it supports only Nginx. The next release (if there is one) will move to a more modular approach to enable support for other load balancers that do not have dynamic and/or active checks and other types of backend checks if needed. * /opt/sbin/lbactive #!/usr/bin/env python import argparse import ConfigParser import logging import logging.handlers import multiprocessing import os import pprint import re import requests import shutil import signal import socket import string import tempfile import time from daemon import runner from subprocess import call HOST = "localhost" PORT = 3333 VERSION = '0.2 beta' # Settings if os.path.isfile('/etc/lbactive/lbactive.cfg'): config = ConfigParser.RawConfigParser() config.readfp(open('/etc/lbactive/lbactive.cfg')) else: print "/etc/lbactive/lbactive.cfg not found! Exiting!!" exit() if config.has_option("main", "logfile"): LOG_FILENAME = config.get("main", "logfile") else: LOG_FILENAME = "/var/log/lbactive" if config.has_option("main", "pidfile"): PID_FILE = config.get("main", "pidfile") else: PID_FILE = '/var/run/lbactive.pid' if config.has_option("main", "loglevel"): LOG_LEVEL = config.get("main", "loglevel") else: LOG_LEVEL = "INFO" if config.has_option("main", "checkinterval"): CHECK_INTERVAL = config.get("main", "checkinterval") else: CHECK_INTERVAL = 5 if config.has_option("main", "updateinterval"): UPDATE_INTERVAL = config.get("main", "updateinterval") else: UPDATE_INTERVAL = 2 if config.has_option("main", "loaddiff"): LOAD_DIFF = config.get("main", "loaddiff") else: LOAD_DIFF = 2 if config.has_option("main", "nginx_location"): NGINX = config.get("main", "nginx_location") else: NGINX = "/usr/sbin/nginx" TMP_DIR = tempfile.gettempdir() LOG_LEVEL_NUMERIC = getattr(logging, LOG_LEVEL.upper(), None) if not isinstance(LOG_LEVEL_NUMERIC, int): raise ValueError('Invalid log level: $s' % LOG_LEVEL) logger = logging.getLogger('lbactive') logging.getLogger("urllib3").setLevel(logging.WARNING) logging.basicConfig(level=LOG_LEVEL_NUMERIC, format='%(asctime)s %(name)s %(levelname)s: %(message)s', filename=LOG_FILENAME) clusters = {} current = {} L = list() for section in config.sections(): if section != "main": current['config_dir'] = config.get(section, "config_dir") logger.info("Loading Nginx Upstream Config: " + current["config_dir"] + "/" + section + ".conf") if config.has_option(section, "clustertype"): current['cluster_type'] = config.get(section, "clustertype") if config.has_option(section, "port"): current['port'] = config.get(section, "port") else: current['port'] = PORT if config.has_option(section, "loaddiff"): current["load_diff"] = config.get(section, "loaddiff") else: current["load_diff"] = LOAD_DIFF if config.has_option(section, "updateinterval"): current["update_interval"] = config.get(section, "updateinterval") else: current["update_interval"] = UPDATE_INTERVAL logger.debug("Health Check Port: " + upstreams[(section, "port")]) current['last_update'] = 0 current['update'] = False current['forced_update'] = False current['lb_type'] = "" if os.path.isfile(current["config_dir"] + "/" + section + ".conf"): with open(current["config_dir"] + "/" + section + ".conf") as upstream: for cfg_line in upstream: xtra_options = '' cfg_line = cfg_line.strip() cfg_line = re.sub(';$', '', cfg_line) first_word = cfg_line.partition(' ')[0] if first_word == "server": cfg_line = re.sub('^server ', '', cfg_line) for word in cfg_line.split(" "): if "weight=" not in word and "down" not in word and cfg_line.partition(' ')[0] != word: xtra_options = " ".join([xtra_options, word]) L.append({'ip':cfg_line.partition(' ')[0],'last_check':False,'last_idle':0,'last_update_action':False, 'xtra_options':xtra_options}) elif first_word == "ip_hash" or first_word == "least_conn" or first_word == "round_robin": current["lb_type"] = first_word elif first_word == "hash": current['lb_type'] = cfg_line current["nodes"] = L clusters[section] = current current = {} L = [] else: logger.warning("NginX config file not found: " + current["config_dir"] + "/" + section + ".conf") elif section == "main": zed = 0 else: logger.critical("No clusters configured! Exiting.") print "No clusters configured! Exiting." exit class Daemon(object): def __init__(self, pidfile_path): self.stdin_path = '/dev/null' self.stdout_path = '/dev/null' self.stderr_path = '/dev/null' self.pidfile_path = None self.pidfile_timeout = 5 self.pidfile_path = PID_FILE def setup_daemon_context(self, daemon_context): self.daemon_context = daemon_context def run(self): logger.info('lbactive service has started') logger.debug('event from the run() = {}'.format(self.daemon_context.stop_event)) while not self.daemon_context.stop_event.wait(float(CHECK_INTERVAL)): try: write_configs = False reload_service = False for cluster_name, cluster in clusters.items(): #print cluster_name for node in cluster['nodes']: logger.debug("Checking: " + node['ip'] + ":" + cluster["port"]) node['last_check'] = int(time.time()) / 60 try: sock = socket.socket (socket.AF_INET, socket.SOCK_STREAM) logger.debug("connecting to: " + node['ip'] + ':' + cluster["port"]) sock.connect((node['ip'], int(cluster["port"]))) data = sock.recv(1024) data = re.sub('%$', '', data) data = int(round(int(data) / 10)) if data == 0: data = 1 if node['last_update_action'] != False: cluster['forced_update'] = True node['last_update_action'] = False node['current_idle'] = data node['update_action'] = False logger.debug(node['ip'] + ': ' + str(data)) except: if node['last_update_action'] != 'down': cluster['forced_update'] = True node['last_update_action'] = 'down' node['current_idle'] = False node["update_action"] = "down" logger.debug("Unable to connect to: " + node['ip']) logger.debug("Setting node: " + node['ip'] + " to down") try: r = requests.head("http://" + node['ip']) r.raise_for_status() except (requests.exceptions.HTTPError, requests.exceptions.ConnectionError, requests.exceptions.Timeout): if node['last_update_action'] != 'down': cluster['forced_update'] = True node['last_update_action'] = 'down' node['current_idle'] = False node["update_action"] = "down" logger.debug("Unable to retrieve HTTP response from: " + node['ip']) logger.debug("Setting node: " + node['ip'] + " to down") else: if node['last_update_action'] != False or node['update_action'] != False: cluster['forced_update'] = True node['last_update_action'] = False node['update_action'] = False logger.debug(node['ip'] + ' status code: ' + str(r.status_code)) if cluster['lb_type'] == "ip_hash" or cluster['lb_type'] == "least_conn": cluster['lb_type'] = "\n " + cluster['lb_type'] + ";" cluster['config'] = " upstream " + cluster_name + " {" + cluster['lb_type'] for node in cluster['nodes']: if node['update_action'] == 'down' : cluster['config'] = '\n'.join([cluster['config'], " server " + node['ip'] + ' down;']) else: if abs(int(node['current_idle'])-int(node['last_idle'])) >= int(cluster['load_diff']): cluster['update'] = True node['last_idle'] = node['current_idle'] cluster['config'] = '\n'.join([cluster['config'], " server " + node['ip'] + ' weight=' + str(node['last_idle']) + node['xtra_options'] + ';']) cluster['config'] = '\n'.join([cluster['config'], " }\n"]) now = int(time.time()) / 60 if cluster['forced_update'] or (now - int(cluster['last_update']) >= int(cluster['update_interval']) and cluster['update']): #print cluster['config'] logger.info("Updating cluster: " + cluster_name) cluster['last_update'] = now cluster['update'] = False cluster['forced_update'] = False write_configs = True else: logger.debug("No update needed") if write_configs: logger.info("writing configs") config_file = open(TMP_DIR + "/" + cluster_name + ".conf", "w") config_file.write(cluster['config']) config_file.close() shutil.copy(TMP_DIR + "/" + cluster_name + ".conf", cluster["config_dir"]) reload_service = True write_configs = False if reload_service: logger.info("Checking Nginx Configuration") if call([NGINX, '-tq']) == 0: logger.info("Reloading Nginx") call([NGINX, '-s', 'reload']) reload_service = False #pp = pprint.PrettyPrinter(indent=4) #pp.pprint(clusters) except BaseException as exc: logger.exception(exc) logger.info('lbactive service has been stopped') def handle_exit(self, signum, frame): try: logger.info('lbactive stopping...') self.daemon_context.stop_event.set() except BaseException as exc: logger.exception(exc) if __name__ == '__main__': app = Daemon(PID_FILE) d = runner.DaemonRunner(app) #d.daemon_context.working_directory = lbactive_cfg.work_dir d.daemon_context.files_preserve = [h.stream for h in logging.root.handlers] d.daemon_context.signal_map = {signal.SIGUSR1: app.handle_exit} d.daemon_context.stop_event = multiprocessing.Event() app.setup_daemon_context(d.daemon_context) #logger.debug('event from the main = {}'.format(d.daemon_context.stop_event)) d.do_action() * /etc/init.d/lbactive ### BEGIN INIT INFO # Provides: lbactive # Required-Start: $remote_fs $syslog nginx # Required-Stop: $remote_fs $syslog # Default-Start: 2 3 4 5 # Default-Stop: 0 1 6 # Short-Description: Load Balancer Active Checks for NginX # Description: Enables active health and connection checks for NginX ### END INIT INFO # Using the lsb functions to perform the operations. . /lib/lsb/init-functions # Process name ( For display ) NAME=lbactive # Daemon name, where is the actual executable DAEMON=/opt/sbin/lbactive # pid file for the daemon PIDFILE=/var/run/lbactive.pid # If the daemon is not there, then exit. test -x $DAEMON || exit 5 case $1 in start) # Checked the PID file exists and check the actual status of process if [ -e $PIDFILE ]; then status_of_proc -p $PIDFILE $DAEMON "$NAME process" && status="0" || status="$?" # If the status is SUCCESS then don't need to start again. if [ $status = "0" ]; then exit # Exit fi fi # Start the daemon. log_daemon_msg "Starting the process" "$NAME" # Start the daemon with the help of start-stop-daemon # Log the message appropriately if $DAEMON start; then log_end_msg 0 else log_end_msg 1 fi ;; stop) # Stop the daemon. if [ -e $PIDFILE ]; then status_of_proc -p $PIDFILE $DAEMON "Stoppping the $NAME process" && status="0" || status="$?" if [ "$status" = 0 ]; then $DAEMON stop /bin/rm -rf $PIDFILE fi else log_daemon_msg "$NAME process is not running" log_end_msg 0 fi ;; restart) # Restart the daemon. $0 stop && sleep 2 && $0 start ;; status) # Check the status of the process. if [ -e $PIDFILE ]; then status_of_proc -p $PIDFILE $DAEMON "$NAME process" && exit 0 || exit $? else log_daemon_msg "$NAME Process is not running" log_end_msg 0 fi ;; reload) # Reload the process. Basically sending some signal to a daemon to reload # it configurations. if [ -e $PIDFILE ]; then start-stop-daemon --stop --signal USR1 --quiet --pidfile $PIDFILE --name $NAME log_success_msg "$NAME process reloaded successfully" else log_failure_msg "$PIDFILE does not exists" fi ;; *) # For invalid arguments, print the usage message. echo "Usage: $0 {start|stop|restart|reload|status}" exit 2 ;; esac * /etc/systemd/system/lbactive.service [Unit] Description=Load Balancing Active Checks [Service] ExecStart=/opt/sbin/lbactive start ExecStop=/opt/sbin/lbactive stop [Install] WantedBy=multi-user.target * /etc/lbactive/lbactive.cfg [main] # Path to PID file pidfile = /run/lbactive.pid # Path to log file logfile = /var/log/lbactive # debug, info, warn, error or critical loglevel = info # Check Interval in seconds (default = 5 seconds) checkinterval = 5 # Update Interval. Minimum time (in minutes) before config is updated if needed # Ignored if a node is registered as being down # Can be overriden on a per Cluster basis # (default = 2 minutes) updateinterval = 2 # Idle load difference before change is needed # Values from 1 - 10 allowed. # A value of 1 is the equivelant of 10% idle time on the node # So a value of 4 = 4 x 10 = 40% idle difference. # Can be overriden on a per Cluster basis # (default = 4) loaddiff = 4 nginx_location = /usr/sbin/nginx [upstream1] clustertype = nginx config_dir = /etc/nginx/upstreams-enabled/ port = 3333 loaddiff = 4 updateinterval = 1 #[upstream2] #clustertype = nginx #config_dir = /etc/nginx/upstreams-enabled/ #port = 3333 #loaddiff = 4 #updateinterval = 2 * /opt/bin/lb-feedback #!/bin/bash LOAD=`/usr/bin/vmstat 5 2| /usr/bin/tail -1| /usr/bin/awk '{print $15;}' | /usr/bin/tee` echo "$LOAD%"