HOME/Articles/

mysql example app (snippet)

Article Outline

Python mysql example 'app'

Functions in program:

  • def get_predict_prometheus():
  • def get_api_traces():
  • def get_prometheus_data():
  • def load_agged_trace(start_ts: int, end_ts: int, sql=None):
  • def get_prome_result(args):
  • def hello_world():

Modules used in program:

  • import requests
  • import pymysql
  • import re
  • import random
  • import json
  • import click

python app

Python mysql example: app

import click

click.disable_unicode_literals_warning = True

import json
import random
import re
from concurrent.futures import ThreadPoolExecutor, as_completed

# from flaskext.mysql import MySQL
import pymysql
import requests
from flask import Flask, request, jsonify
from tqdm import tqdm

app = Flask(__name__)


# mysql = MySQL()
# mysql.init_app(app)


class Database:
    def __init__(self):
        host = "106.75.2.46"
        port = 4000
        user = "root"
        db = "mysql"

        self.con = pymysql.connect(host=host,
                                   port=port,
                                   user=user,
                                   db=db,
                                   cursorclass=pymysql.cursors.DictCursor)

        self.cursor = self.con.cursor()

    def save_metrics(self, metrics):
        for item in metrics:
            self.cursor.execute(
                'insert into my_metrics (timestamps, is_sys, data) values (%s, %s, %s)', (item[0], 1, json.dumps(item))
            )
            self.con.commit()


# db = Database()


@app.route('/')
def hello_world():
    return 'Hello World!'


def get_prome_result(args):
    start_time = args.get('start_time', 0)
    end_time = args.get('end_time', 0)
    query = args.get('query', "")

    res = requests.get(
        f'http://106.75.2.46:9090/api/v1/query_range?query={query}&start={start_time}&end={end_time}&step=14'
    )

    response_json = res.json()
    value_list = response_json['data']['result']
    return value_list


def load_agged_trace(start_ts: int, end_ts: int, sql=None):
    prometheus_result = get_prome_result(
        {'start_time': start_ts,
         'end_time': end_ts,
         'query': '100 - (avg by (instance) (irate(node_cpu_seconds_total{job="node_exporter",mode="idle"}[30s])) * 100)'
         }
    )

    traces = requests.get('http://106.75.2.46:16686/api/traces', params={
        'start': int(start_ts) * 1000000,
        'end': int(end_ts) * 1000000,
        'limit': 40,
        'lookback': 'custom',
        'maxDuration': '',
        'minDuration': '',
        'service': 'TiDB',
        'operation': 'session.runStmt',
    }).json()['data']

    new_traces = []
    for trace in traces:
        trace_ = {}
        trace_['sql'] = [s for s in trace['spans'] if s['operationName']
                         == 'session.runStmt'][0]['logs'][0]['fields'][0]['value']
        if sql is None:
            sql = trace_['sql']

        t = 0
        if re.search('select id from my_user where id=(.*)', sql):
            t = 1
        elif re.search('select count\(\*\) from my_user where age=(.*)', sql):
            t = 2
        elif re.search('insert (.*)', sql):
            t = 3
        trace_['sql_type'] = t
        for s in trace['spans']:
            trace_[s['operationName']] = s['duration']
        new_traces.append(trace_)

    def agg_traces(traces, sql_type):
        traces = [t for t in new_traces if t['sql_type'] == sql_type]
        traces_ = {
            'sql_type': sql_type,
            'sample': random.choice(traces),
        }
        for k in traces[0]:
            if type(traces[0][k]) == int:
                l = [t.get(k, 0) for t in traces]
                traces_[k] = sum(l) / len(l)

        traces_['cpu'] = float(prometheus_result[0]['values'][0][1])
        return traces_

    return agg_traces(new_traces, 1)


@app.route('/get_prometheus_data')
def get_prometheus_data():
    args = request.args.to_dict()
    return jsonify(get_prome_result(args))


@app.route('/api/traces')
def get_api_traces():
    args = request.args.to_dict()
    traces = requests.get('http://106.75.2.46:16686/api/traces', params=args).json()
    return jsonify(traces)


@app.route('/get_predict_prometheus')
def get_predict_prometheus():
    args = request.args.to_dict()
    start_time: int = int(args.get('start_time', 0))
    # end_time = args.get('end_time', 0)
    query = args.get('type', 'tracing')
    metric_type = args.get('metric_type', 1)
    sql = args.get('sql', '')

    STEPS = 10
    INTERVAL = 10
    result = [None for x in range(STEPS)]
    timeseries = [start_time + i * 10 for i in range(STEPS)]

    with ThreadPoolExecutor(max_workers=10) as executor:
        # Start the load operations and mark each future with its URL
        future_to_url = {executor.submit(
            load_agged_trace, start_time + i * INTERVAL, start_time + i * INTERVAL + INTERVAL, sql): i for i in
                         range(STEPS)}
        for future in tqdm(as_completed(future_to_url)):
            i = future_to_url[future]
            try:
                result[i] = future.result()
            except Exception as exc:
                print(exc)
                pass

    fields = []
    if metric_type == '0':
        fields = ['cpu', 'session.runStmt', 'session.getTxnFuture', 'session.ParseSQL', 'executor.Compile',
                  'session.CommitTxn',
                  'pdclient.processTSORequests', 'session.Execute', 'GetTSAsync', 'executor.Next',
                  'server.dispatch']
    elif metric_type == '1':
        fields = ['cpu', 'session.runStmt', 'session.getTxnFuture', 'session.ParseSQL', 'executor.Compile',
                  'session.CommitTxn', 'pdclient.processTSORequests', 'session.Execute', 'GetTSAsync',
                  'executor.Next', 'server.dispatch', '/tikvpb.Tikv/Coprocessor', 'tableReader.Next',
                  'streamAgg.Next']
    elif metric_type == '2':
        fields = ['cpu', 'session.runStmt', 'session.getTxnFuture', 'session.ParseSQL', 'executor.Compile',
                  'session.CommitTxn', 'pdclient.processTSORequests', 'session.Execute', 'GetTSAsync',
                  'executor.Next', 'server.dispatch', '/tikvpb.Tikv/Coprocessor', 'tableReader.Next',
                  'streamAgg.Next', 'executor.handleNoDelayExecutor', '/tikvpb.Tikv/KvCommit',
                  '/tikvpb.Tikv/KvPrewrite']

    res = []

    for field in fields:
        field_result = [item.get(field, []) for item in result if item is not None]
        res.append(field_result)

    return jsonify({'origin': res, 'prediction': [], 'fields': fields, 'timeseries': timeseries})


if __name__ == '__main__':
    app.run()