More code reorg: move Gearman code to worker.py
Move the Gearman code used by the worker out of main.py and into worker.py where it belongs. Change-Id: Ied1e32034e7c2f497fcc6c183fc6c5f48cc08129
This commit is contained in:
		@@ -14,26 +14,16 @@
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import daemon
 | 
					import daemon
 | 
				
			||||||
import daemon.pidfile
 | 
					import daemon.pidfile
 | 
				
			||||||
import gearman.errors
 | 
					 | 
				
			||||||
import grp
 | 
					import grp
 | 
				
			||||||
import pwd
 | 
					import pwd
 | 
				
			||||||
import socket
 | 
					 | 
				
			||||||
from time import sleep
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
from libra.openstack.common import importutils
 | 
					from libra.openstack.common import importutils
 | 
				
			||||||
from libra.common.json_gearman import JSONGearmanWorker
 | 
					 | 
				
			||||||
from libra.common.options import Options, setup_logging
 | 
					from libra.common.options import Options, setup_logging
 | 
				
			||||||
from libra.worker.worker import config_manager
 | 
					from libra.worker.worker import config_manager
 | 
				
			||||||
from libra.worker.drivers.base import known_drivers
 | 
					from libra.worker.drivers.base import known_drivers
 | 
				
			||||||
from libra.worker.drivers.haproxy.services_base import haproxy_services
 | 
					from libra.worker.drivers.haproxy.services_base import haproxy_services
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class CustomJSONGearmanWorker(JSONGearmanWorker):
 | 
					 | 
				
			||||||
    """ Custom class we will use to pass arguments to the Gearman task. """
 | 
					 | 
				
			||||||
    logger = None
 | 
					 | 
				
			||||||
    driver = None
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
class Server(object):
 | 
					class Server(object):
 | 
				
			||||||
    """
 | 
					    """
 | 
				
			||||||
    Encapsulates server activity so we can run it in either daemon or
 | 
					    Encapsulates server activity so we can run it in either daemon or
 | 
				
			||||||
@@ -48,31 +38,10 @@ class Server(object):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    def main(self):
 | 
					    def main(self):
 | 
				
			||||||
        """ Main method of the server.  """
 | 
					        """ Main method of the server.  """
 | 
				
			||||||
        my_ip = socket.gethostbyname(socket.gethostname())
 | 
					        config_manager(self.logger,
 | 
				
			||||||
        task_name = "lbaas-%s" % my_ip
 | 
					                       self.driver,
 | 
				
			||||||
        self.logger.info("Registering task %s" % task_name)
 | 
					                       self.servers,
 | 
				
			||||||
 | 
					                       self.reconnect_sleep)
 | 
				
			||||||
        worker = CustomJSONGearmanWorker(self.servers)
 | 
					 | 
				
			||||||
        worker.set_client_id(my_ip)
 | 
					 | 
				
			||||||
        worker.register_task(task_name, config_manager)
 | 
					 | 
				
			||||||
        worker.logger = self.logger
 | 
					 | 
				
			||||||
        worker.driver = self.driver
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        retry = True
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        while (retry):
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                worker.work()
 | 
					 | 
				
			||||||
            except KeyboardInterrupt:
 | 
					 | 
				
			||||||
                retry = False
 | 
					 | 
				
			||||||
            except gearman.errors.ServerUnavailable:
 | 
					 | 
				
			||||||
                self.logger.error("Job server(s) went away. Reconnecting.")
 | 
					 | 
				
			||||||
                sleep(self.reconnect_sleep)
 | 
					 | 
				
			||||||
                retry = True
 | 
					 | 
				
			||||||
            except Exception as e:
 | 
					 | 
				
			||||||
                self.logger.critical("Exception: %s, %s" % (e.__class__, e))
 | 
					 | 
				
			||||||
                retry = False
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        self.logger.info("Shutting down")
 | 
					        self.logger.info("Shutting down")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -12,12 +12,22 @@
 | 
				
			|||||||
# License for the specific language governing permissions and limitations
 | 
					# License for the specific language governing permissions and limitations
 | 
				
			||||||
# under the License.
 | 
					# under the License.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import gearman.errors
 | 
				
			||||||
import json
 | 
					import json
 | 
				
			||||||
 | 
					import socket
 | 
				
			||||||
 | 
					import time
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from libra.common.json_gearman import JSONGearmanWorker
 | 
				
			||||||
from libra.worker.controller import LBaaSController
 | 
					from libra.worker.controller import LBaaSController
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def config_manager(worker, job):
 | 
					class CustomJSONGearmanWorker(JSONGearmanWorker):
 | 
				
			||||||
 | 
					    """ Custom class we will use to pass arguments to the Gearman task. """
 | 
				
			||||||
 | 
					    logger = None
 | 
				
			||||||
 | 
					    driver = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def handler(worker, job):
 | 
				
			||||||
    """
 | 
					    """
 | 
				
			||||||
    Main Gearman worker task.
 | 
					    Main Gearman worker task.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -36,3 +46,30 @@ def config_manager(worker, job):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    logger.debug("Return JSON message: %s" % json.dumps(response, indent=4))
 | 
					    logger.debug("Return JSON message: %s" % json.dumps(response, indent=4))
 | 
				
			||||||
    return response
 | 
					    return response
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def config_manager(logger, driver, servers, reconnect_sleep):
 | 
				
			||||||
 | 
					    my_ip = socket.gethostbyname(socket.gethostname())
 | 
				
			||||||
 | 
					    task_name = "lbaas-%s" % my_ip
 | 
				
			||||||
 | 
					    logger.info("Registering task %s" % task_name)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    worker = CustomJSONGearmanWorker(servers)
 | 
				
			||||||
 | 
					    worker.set_client_id(my_ip)
 | 
				
			||||||
 | 
					    worker.register_task(task_name, handler)
 | 
				
			||||||
 | 
					    worker.logger = logger
 | 
				
			||||||
 | 
					    worker.driver = driver
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    retry = True
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    while (retry):
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            worker.work()
 | 
				
			||||||
 | 
					        except KeyboardInterrupt:
 | 
				
			||||||
 | 
					            retry = False
 | 
				
			||||||
 | 
					        except gearman.errors.ServerUnavailable:
 | 
				
			||||||
 | 
					            logger.error("Job server(s) went away. Reconnecting.")
 | 
				
			||||||
 | 
					            time.sleep(reconnect_sleep)
 | 
				
			||||||
 | 
					            retry = True
 | 
				
			||||||
 | 
					        except Exception as e:
 | 
				
			||||||
 | 
					            logger.critical("Exception: %s, %s" % (e.__class__, e))
 | 
				
			||||||
 | 
					            retry = False
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user