
mysql example app (snippet)

Article Outline

Python mysql example 'app'

Functions in program:

  • def get_predict_prometheus():
  • def get_api_traces():
  • def get_prometheus_data():
  • def load_agged_trace(start_ts: int, end_ts: int, sql=None):
  • def get_prome_result(args):
  • def hello_world():

Modules used in program:

  • import requests
  • import pymysql
  • import re
  • import random
  • import json
  • import click

python app

Python mysql example: app

import click

click.disable_unicode_literals_warning = True

import json
import random
import re
from concurrent.futures import ThreadPoolExecutor, as_completed

# from flaskext.mysql import MySQL
import pymysql
import requests
from flask import Flask, request, jsonify
from tqdm import tqdm

app = Flask(__name__)

# mysql = MySQL()
# mysql.init_app(app)

class Database:
    def __init__(self):
        host = ""
        port = 4000
        user = "root"
        db = "mysql"

        self.con = pymysql.connect(host=host,

        self.cursor = self.con.cursor()

    def save_metrics(self, metrics):
        for item in metrics:
                'insert into my_metrics (timestamps, is_sys, data) values (%s, %s, %s)', (item[0], 1, json.dumps(item))

# db = Database()

def hello_world():
    return 'Hello World!'

def get_prome_result(args):
    start_time = args.get('start_time', 0)
    end_time = args.get('end_time', 0)
    query = args.get('query', "")

    res = requests.get(

    response_json = res.json()
    value_list = response_json['data']['result']
    return value_list

def load_agged_trace(start_ts: int, end_ts: int, sql=None):
    prometheus_result = get_prome_result(
        {'start_time': start_ts,
         'end_time': end_ts,
         'query': '100 - (avg by (instance) (irate(node_cpu_seconds_total{job="node_exporter",mode="idle"}[30s])) * 100)'

    traces = requests.get('', params={
        'start': int(start_ts) * 1000000,
        'end': int(end_ts) * 1000000,
        'limit': 40,
        'lookback': 'custom',
        'maxDuration': '',
        'minDuration': '',
        'service': 'TiDB',
        'operation': 'session.runStmt',

    new_traces = []
    for trace in traces:
        trace_ = {}
        trace_['sql'] = [s for s in trace['spans'] if s['operationName']
                         == 'session.runStmt'][0]['logs'][0]['fields'][0]['value']
        if sql is None:
            sql = trace_['sql']

        t = 0
        if re.search('select id from my_user where id=(.*)', sql):
            t = 1
        elif re.search('select count\(\*\) from my_user where age=(.*)', sql):
            t = 2
        elif re.search('insert (.*)', sql):
            t = 3
        trace_['sql_type'] = t
        for s in trace['spans']:
            trace_[s['operationName']] = s['duration']

    def agg_traces(traces, sql_type):
        traces = [t for t in new_traces if t['sql_type'] == sql_type]
        traces_ = {
            'sql_type': sql_type,
            'sample': random.choice(traces),
        for k in traces[0]:
            if type(traces[0][k]) == int:
                l = [t.get(k, 0) for t in traces]
                traces_[k] = sum(l) / len(l)

        traces_['cpu'] = float(prometheus_result[0]['values'][0][1])
        return traces_

    return agg_traces(new_traces, 1)

def get_prometheus_data():
    args = request.args.to_dict()
    return jsonify(get_prome_result(args))

def get_api_traces():
    args = request.args.to_dict()
    traces = requests.get('', params=args).json()
    return jsonify(traces)

def get_predict_prometheus():
    args = request.args.to_dict()
    start_time: int = int(args.get('start_time', 0))
    # end_time = args.get('end_time', 0)
    query = args.get('type', 'tracing')
    metric_type = args.get('metric_type', 1)
    sql = args.get('sql', '')

    STEPS = 10
    INTERVAL = 10
    result = [None for x in range(STEPS)]
    timeseries = [start_time + i * 10 for i in range(STEPS)]

    with ThreadPoolExecutor(max_workers=10) as executor:
        # Start the load operations and mark each future with its URL
        future_to_url = {executor.submit(
            load_agged_trace, start_time + i * INTERVAL, start_time + i * INTERVAL + INTERVAL, sql): i for i in
        for future in tqdm(as_completed(future_to_url)):
            i = future_to_url[future]
                result[i] = future.result()
            except Exception as exc:

    fields = []
    if metric_type == '0':
        fields = ['cpu', 'session.runStmt', 'session.getTxnFuture', 'session.ParseSQL', 'executor.Compile',
                  'pdclient.processTSORequests', 'session.Execute', 'GetTSAsync', 'executor.Next',
    elif metric_type == '1':
        fields = ['cpu', 'session.runStmt', 'session.getTxnFuture', 'session.ParseSQL', 'executor.Compile',
                  'session.CommitTxn', 'pdclient.processTSORequests', 'session.Execute', 'GetTSAsync',
                  'executor.Next', 'server.dispatch', '/tikvpb.Tikv/Coprocessor', 'tableReader.Next',
    elif metric_type == '2':
        fields = ['cpu', 'session.runStmt', 'session.getTxnFuture', 'session.ParseSQL', 'executor.Compile',
                  'session.CommitTxn', 'pdclient.processTSORequests', 'session.Execute', 'GetTSAsync',
                  'executor.Next', 'server.dispatch', '/tikvpb.Tikv/Coprocessor', 'tableReader.Next',
                  'streamAgg.Next', 'executor.handleNoDelayExecutor', '/tikvpb.Tikv/KvCommit',

    res = []

    for field in fields:
        field_result = [item.get(field, []) for item in result if item is not None]

    return jsonify({'origin': res, 'prediction': [], 'fields': fields, 'timeseries': timeseries})

if __name__ == '__main__':