HOME/Articles/

mysql example zeus app v 1000 agentApiService (snippet)

Article Outline

Python mysql example 'zeus app v 1000 agentApiService'

Modules used in program:

  • import json
  • import hashlib

python zeus app v 1000 agentApiService

Python mysql example: zeus app v 1000 agentApiService

# -*- coding:utf-8 -*-
import hashlib
import json

from flask.ext import restful
from flask.ext.restful import reqparse

from .errorCode import CustomFlaskErr, ErrorCode
from app_v_1000.DBUtilsTools import DBPool, DBSingle

dbPoolTool = DBPool()
errorCode = ErrorCode()
conn = dbPoolTool.getConnection()


class CommonApiService(restful.Resource):

    def __init__(self):
        self.getHandler = GetHandler()

    def get(self):
        """
        解析请求参数
        funName
        parameters
        """
        parser = reqparse.RequestParser()
        parser.add_argument('funName', type=str, required=True, help='Agent monitor mysql funName is must required')
        parser.add_argument('parameters', type=str, required=True, help='Agent monitor mysql parameters is must required')
        args = parser.parse_args()
        print("args="+str(args))
        funnName = args.get('funName')
        parameters = eval(args.get('parameters'))

        if funnName == 'getMysqlConf':
            ip = parameters['mysql_ip']
            port = parameters['mysql_port']
            print(str("解析请求API的参数:" + str(args)))

            apiData = self.getHandler.getMysqlConf(ip,port)
            print("apiData="+str(apiData))
        elif funnName == 'getMysqlManageInfo':
            ip = parameters['mysql_ip']
            port = parameters['mysql_port']
            print(str("解析请求API的参数:" + str(args)))

            apiData = self.getHandler.getMysqlManageInfo(ip, port)
            print("apiData=" + str(apiData))


        if apiData:
            jsonData = json.dumps(apiData)
            print(str("slowLogAgentApi返回数据:" + str(apiData)))
            return apiData, 200
        else:
            raise CustomFlaskErr(errorCode.MYSQL_NOT_CONF_IN_ZEUS['return_code'],
                                 errorCode.MYSQL_NOT_CONF_IN_ZEUS['status_code'])



class SlowLogApiService(restful.Resource):

    def __init__(self):
        self.getHandler = GetHandler()

    def post(self):
        parser = reqparse.RequestParser()
        parser.add_argument('post_data')
        parser.add_argument('instance_id', type=str)
        parser.add_argument('dataType',type=str)
        args = parser.parse_args()

        id = args.get('instance_id')
        data = args.get('post_data')
        dataType = args.get('dataType')
        dataList = json.loads(data, encoding='utf-8')
        print(id)
        print(dataType)
        print(dataList)
        for dict in dataList:
            self.getHandler.handler_fingerprint(conn, dict['fingerprint'], int(id))
            self.getHandler.insertSlowLog(conn, dict, id)

class PerformanceQuotaApiService(restful.Resource):

    def __init__(self):
        self.getHandler = GetHandler()

    def post(self):
        parser = reqparse.RequestParser()
        parser.add_argument('post_data')
        parser.add_argument('instance_id', type=str)
        parser.add_argument('dataType',type=str)
        args = parser.parse_args()

        instance_id = args.get('instance_id')
        data = args.get('post_data')
        dataType = args.get('dataType')
        dataDict = json.loads(data, encoding='utf-8')
        print(id)
        print(dataType)
        print(dataDict)
        #处理请求写入数据库
        self.getHandler.insertPerformanceQuota(conn, dataDict, instance_id)


class GetHandler:

    def getMysqlConf(self,ip,port):
        conf = {}
        poolCursor = conn.cursor()
        getMysqlIdSql="select id,mysql_host,mysql_port,manage_user,manage_password from mysql_instances where mysql_host='%s' and mysql_port=%s;" % (ip,port)
        print(getMysqlIdSql)
        poolCursor.execute(getMysqlIdSql)
        #获取 ID 和 管理账户
        result = poolCursor.fetchone()
        print(result)
        print(str("获取mysql id 结果集:"+str(result)))
        if result:
            mysqlId = result[0]
            mysqlHost = result[1]
            mysqlPort = result[2]
            manageUser = result[3]
            managePassword = result[4]

            connSingle = DBSingle(mysqlHost, int(mysqlPort), manageUser, managePassword)
            singleConn = connSingle.getConnection()
            singleCursor = singleConn.cursor()
            getSlowQueryLogSql = "show variables like 'slow_query_log';"
            singleCursor.execute(getSlowQueryLogSql)
            result = singleCursor.fetchone()
            print(str("获取mysql slow_query_log 结果集:" + str(result)))
            conf['slow_query_log'] = result[1]

            getSlowQueryLogFileSql = "show variables like 'slow_query_log_file';"
            singleCursor.execute(getSlowQueryLogFileSql)
            result = singleCursor.fetchone()
            print(str("获取mysql slow_query_log_file 结果集:" + str(result)))
            conf['slow_query_log_file'] = result[1]
            conf['id'] = mysqlId

            print(conf)
            return conf
        else:
            print("conf="+str(conf))
            return conf

    def md5string(self, fingerprint):
        m2 = hashlib.md5()
        m2.update(fingerprint)
        return m2.hexdigest()

    def handler_fingerprint(self, conn, fingerprint, instance_id):
        cursor = conn.cursor()
        md5string = self.md5string(fingerprint)
        # 通过md5值查看sql是否在fingerprint表中,如果在 直接写入slowlog表 不存在 先写入 fingerprint表
        cursor.execute("select id,instance_id,md5code from slow_query_fingerprint(where instance_id=" + str()
            instance_id) + " and md5code='" + md5string + "';")
        result = cursor.fetchone()
        if result:
            # 已存在 处理数据并返回TRUE
            cursor.execute(
                "update slow_query_fingerprint(set last_apper_time=now(),total_number=total_number+1 where id=" + str()
                    result[0]))
            cursor.execute("commit;")
            cursor.close()
        else:
            # 不存在
            #判断instance_id的mysql是否开启新增慢查询报警
            #if self.check_alter(self,conn,instance_id)['alter_slowlog']:
                #self.alter_slowlog_find_first(self, instance_id, schema, found_sql, conn)

            cursor.execute(
                "insert slow_query_fingerprint(instance_id,md5code,sql_fingerprint,first_apper_time,last_apper_time,total_number) values(" + str(
                    instance_id) + ",'" + str(md5string) + "','" + str(fingerprint) + "',now(),now(),1)")
            cursor.execute("commit;")
            cursor.close()

    def insertSlowLog(self, conn, event, instance_id):
        fingerprint_md5 = self.md5string(event['fingerprint'])
        cursor = conn.cursor()
        qtime = ''
        if event.has_key('time'):
            qtime = event['time']

        insert_sql = "insert slow_log(instance_id,query_time,last_errno,rows_examined,rows_sent," \
                     "log_timestamp,fingerprint_md5,bytes_sent,lock_time,killed," \
                     "cmd_user,query_sql,cmd_ip,query_date,cmd_schema) values" \
                     "(" + str(instance_id) + ",'" + str(event['query_time']) + "','" + str(
            event['last_errno']) + "','" + str(event['rows_examined']) + "'" \
                                                                         "," + str(event['rows_sent']) + "," + str(
            event['timestamp']) + ",'" + str(fingerprint_md5) + "'," + str(event['bytes_sent']) + "" \
                                                                                                  ",'" + str(
            event['lock_time']) + "'," + str(event['killed']) + ",'" + str(event['user']) + "','" + str(
            event['sql']) + "'" \
                            ",'" + str(event['ip']) + "','" + str(qtime) + "','" + str(
            event['schema']) + "')"
        print( insert_sql)
        cursor.execute(insert_sql)
        cursor.execute("commit;")
        cursor.close

    def getMysqlManageInfo(self,ip,port):
        conf = {}
        poolCursor = conn.cursor()
        getMysqlIdSql = "select id,mysql_host,mysql_port,manage_user,manage_password from mysql_instances where mysql_host='%s' and mysql_port=%s;" % (
        ip, port)
        print(getMysqlIdSql)
        poolCursor.execute(getMysqlIdSql)
        # 获取 ID 和 管理账户
        result = poolCursor.fetchone()
        print(str("获取mysql id 结果集:" + str(result)))
        if result:
            conf['mysqlId'] = result[0]
            conf['mysqlHost'] = result[1]
            conf['mysqlPort'] = result[2]
            conf['manageUser'] = result[3]
            conf['managePassword'] = result[4]
            return conf
        return conf

    def insertPerformanceQuota(self,conn,data,instance_id):
        insertSQL = "insert mysql_performance_quota(instance_id,create_time,com_select,com_delete,questions,com_insert,com_commit,com_rollback,com_update,qps,tps) values("\
                    +str(instance_id)+",'"+str(data['create_time'])+"',"+str(data['Com_select'])+","+str(data['Com_delete'])+","\
                    +str(data['Questions'])+","+str(data['Com_insert'])+","+str(data['Com_commit'])+","+str(data['Com_rollback'])+","\
                    +str(data['Com_update'])+","+str(data['Questions'])+","+str(data['Com_commit']+data['Com_rollback'])+");"
        print(insertSQL)
        cursor = conn.cursor()
        cursor.execute(insertSQL)
        cursor.execute("commit")
        cursor.close










if __name__ == '__main__':

    a = GetHandler()
    a.alter_slowlog_find_first(2,'zeus','select 1',conn)