HOME/Articles/

mysql example models (snippet)

Article Outline

Python mysql example 'models'

Modules used in program:

  • import datetime

python models

Python mysql example: models

from app import app, db
from flask_user import UserMixin, UserManager, SQLAlchemyAdapter
from flask_user.forms import RegisterForm
from passlib.apps import mysql_context
# import forms
import datetime


class User(db.Model, UserMixin):
  id = db.Column(db.Integer, primary_key=True)
  username = db.Column(db.String(50), nullable=False, unique=True)
  password = db.Column(db.String(255), nullable=False, default='')
  reset_password_token = db.Column(db.String(100), nullable=False, default='')

  # User Email information
  email = db.Column(db.String(255), nullable=False, unique=True)
  confirmed_at = db.Column(db.DateTime())

  # User information
  is_enabled = db.Column(db.Boolean(), nullable=False, default=False)
  first_name = db.Column(db.String(50), nullable=False, default='')
  last_name = db.Column(db.String(50), nullable=False, default='')

  roles = db.relationship('Role', secondary="user_roles", backref=db.backref('users', lazy='dynamic'))
  school_joins = db.relationship('SchoolCoordinatorJoin', backref='coordinator', lazy='dynamic')

  def is_active(self):
    return self.is_enabled

  def get_full_name(self):
    return "%s %s" % (self.first_name, self.last_name)

  def is_coordinator(self):
    role_ids = [user_role.role_id for user_role in db.session.query(UserRoles).filter_by(user_id=self.id).all()]
    roles = [db.session.query(Role).filter_by(id=role_id).first() for role_id in role_ids]
    names = [role.name for role in roles]
    if "coordinator" in names:
      return True
    else:
      return False

  def get_role(self):
    user_role = db.session.query(UserRoles).filter_by(user_id=self.id).first()
    role = db.session.query(Role).filter_by(id=user_role.role_id).first()
    return role

  def get_user_role(self):
    role = db.session.query(UserRoles).filter_by(user_id=self.id).first()
    return role

  def is_mentor(self):
    role_ids = [user_role.role_id for user_role in db.session.query(UserRoles).filter_by(user_id=self.id).all()]
    roles = [db.session.query(Role).filter_by(id=role_id).first() for role_id in role_ids]
    names = [role.name for role in roles]
    if 'mentor' in names:
      return True
    else:
      return False

  def get_defaults(self):
    return {'id': self.id,
        'username': self.username,
        'email': self.email,
        'first_name': self.first_name,
        'last_name': self.last_name,
        'role_id': unicode(str(self.get_role().id), 'utf-8')
        }

  def update_all(self, username, email, first_name, last_name):
    self.username = username
    self.email = email
    self.first_name = first_name
    self.last_name = last_name

  def can_view_student(self, student_id):
    role_name = self.get_role().name
    student = db.session.query(Student).filter_by(id=student_id).first()
    if role_name == 'admin':
      return True
    elif role_name == 'mentor':
      return student.mentor_id == self.id
    elif role_name == 'coordinator':
      school = db.session.query(School).filter_by(id=student.school_id).first()
      if school is None:
        return False
      count = db.session.query(SchoolCoordinatorJoin).filter(SchoolCoordinatorJoin.school_id==school.id, SchoolCoordinatorJoin.user_id==self.id).count()
      return count > 0
    else:
      return False

  def can_view_school(self, school_id):
    role_name = self.get_role().name
    school = db.session.query(School).filter_by(id=school_id).first()
    if role_name == 'admin':
      return True
    elif role_name == 'coordinator':
      count = db.session.query(SchoolCoordinatorJoin).filter(SchoolCoordinatorJoin.school_id==school_id, SchoolCoordinatorJoin.user_id==self.id).count()
      return count > 0
    else:
      return False

  def can_view_response(self, response_id):
    response = db.session.query(SurveyResponse).filter_by(id=response_id).first()
    student = response.student
    return self.can_view_student(student.id)


db_adapter = SQLAlchemyAdapter(db, User)
user_manager = UserManager(db_adapter, app)

class Role(db.Model):
  id = db.Column(db.Integer, primary_key=True)
  name = db.Column(db.String(50), unique=True)

class UserRoles(db.Model):
  id = db.Column(db.Integer, primary_key=True)
  user_id = db.Column(db.Integer(), db.ForeignKey('user.id', ondelete="CASCADE"))
  role_id = db.Column(db.Integer(), db.ForeignKey('role.id', ondelete="CASCADE"))

class Teacher(db.Model):
  __tablename__ = "teachers"
  id = db.Column(db.Integer, primary_key=True)
  name = db.Column(db.String(255))
  students = db.relationship('Student', backref='teachers', lazy='dynamic')
  fieldtrips = db.relationship('FieldTrip', backref='teachers', lazy='dynamic')
  school_id = db.Column(db.Integer, db.ForeignKey("schools.id"))

  def __init__(self, name, school_id):
    self.name = name
    self.school_id = school_id

  def get_school_name(self):
    school = db.session.query(School).filter_by(id=self.school_id).first()
    if school is not None:
      return school.name
    else:
      return None

  def get_student_count(self):
    return db.session.query(Student).filter_by(teacher_id=self.id).count()

class Student(db.Model):
  __tablename__ = "students"
  id = db.Column(db.Integer, primary_key=True)
  first_name = db.Column(db.String(128))
  last_name = db.Column(db.String(128))
  grade = db.Column(db.Integer)
  teacher_id = db.Column(db.Integer, db.ForeignKey("teachers.id"))
  gender = db.Column(db.String(1))
  aaft = db.Column(db.Boolean)
  mentoring = db.Column(db.Boolean)
  school_id = db.Column(db.Integer, db.ForeignKey("schools.id"))
  mentor_id = db.Column(db.Integer, db.ForeignKey("user.id"))
  field_trip_events = db.relationship('FieldTripEvent', backref='attendee', lazy='dynamic')
  surveys = db.relationship('SurveyResponse', backref='student', lazy='dynamic')
  answers = db.relationship('Answer', backref='answerer', lazy='dynamic')

  def __init__(self, first_name, last_name, grade, teacher_id, gender, aaft=False, mentoring=False, school_id=None, mentor_id=None):
    self.first_name = first_name
    self.last_name = last_name
    self.grade = int(grade)
    self.teacher_id = teacher_id
    self.gender = gender
    self.aaft = aaft
    self.mentoring = mentoring
    self.school_id = school_id
    self.mentor_id = mentor_id

  def update_all(self, first_name, last_name, grade, teacher_id, gender, aaft, mentoring, school_id, mentor_id):
    self.first_name = first_name
    self.last_name = last_name
    self.grade = int(grade)
    self.teacher_id = teacher_id
    self.gender = gender
    self.aaft = aaft
    self.mentoring = mentoring
    self.school_id = school_id
    self.mentor_id = mentor_id

  def get_identifying_info(self):
    string = "%s %s, %s" % (self.first_name, self.last_name, self.get_teacher_name())
    return string

  def get_school_name(self):
    school = db.session.query(School).filter_by(id=self.school_id).first()
    if school is not None:
      return school.name
    else:
      return 'None'

  def get_teacher_name(self):
    teacher = db.session.query(Teacher).filter_by(id=self.teacher_id).first()
    if teacher is not None:
      return teacher.name
    else:
      return 'None'

  def get_mentor_name(self):
    mentor = db.session.query(User).filter_by(id=self.mentor_id).first()
    if mentor is not None:
      return '%s %s' % (mentor.first_name, mentor.last_name)
    else:
      return 'None'

  def get_field_trip_count(self):
    field_trip_events = db.session.query(FieldTripEvent).filter_by(student_id=self.id).all()
    return len(field_trip_events)

  def get_survey_count(self):
    survey_responses = db.session.query(SurveyResponse).filter_by(student_id=self.id).all()
    return len(survey_responses)

  def is_on_roster(self, field_trip_id):
    count = db.session.query(FieldTripEvent).filter(FieldTripEvent.student_id==self.id, FieldTripEvent.field_trip_id==field_trip_id).count()
    if count >= 1:
      return True
    else:
      return False

  def get_defaults(self):
    return {'id': self.id,
        'first_name': self.first_name,
        'last_name': self.last_name,
        'grade': str(self.grade),
        'teacher_id': self.teacher_id,
        'gender': self.gender,
        'aaft': '0' if self.aaft == False else '1',
        'mentoring': '0' if self.mentoring == False else '1',
        'school': str(self.school_id),
        'mentor': str(self.mentor_id)
        }

class FieldTrip(db.Model):
  __tablename__ = "field_trips"
  id = db.Column(db.Integer, primary_key=True)
  date = db.Column(db.DateTime)
  venue_id = db.Column(db.Integer, db.ForeignKey("field_trip_venues.id"))
  teacher_id = db.Column(db.Integer, db.ForeignKey("teachers.id"))
  survey = db.relationship('Survey', backref='fieldtrip', lazy='dynamic')

  def __init__(self, venue_id, date, teacher_id):
    self.date = date
    self.venue_id = venue_id
    self.teacher_id = teacher_id

  def get_identifying_info(self):
    venue = db.session.query(FieldTripVenue).filter_by(id=self.venue_id).first()
    string = "%s, %s" % (venue.name, self.date.strftime('%x'))
    return string

  def get_venue_name(self):
    return db.session.query(FieldTripVenue).filter_by(id=self.venue_id).first().name

  def get_student_count(self):
    events = db.session.query(FieldTripEvent).filter_by(field_trip_id=self.id).all()
    student_ids = [event.student_id for event in events]
    return len(set(student_ids))


# FieldTrip has FieldTripVenue
class FieldTripVenue(db.Model):
  __tablename__ = "field_trip_venues"
  id = db.Column(db.Integer, primary_key=True)
  name = db.Column(db.String(255))
  field_trips = db.relationship('FieldTrip', backref='venue', lazy='dynamic')

  def __init__(self, name):
    self.name = name

# Student has FieldTripEvent
# FieldTrip has FieldTripEvent
class FieldTripEvent(db.Model):
  __tablename__ = "field_trip_events"
  id = db.Column(db.Integer, primary_key=True)
  student_id = db.Column(db.Integer, db.ForeignKey("students.id", ondelete='CASCADE'))
  field_trip_id = db.Column(db.Integer, db.ForeignKey("field_trips.id", ondelete='CASCADE'))

  def __init__(self, student_id, field_trip_id):
    self.student_id = student_id
    self.field_trip_id = field_trip_id

  def get_field_trip(self):
    ft = db.session.query(FieldTrip).filter_by(id=self.field_trip_id).first()
    return ft

class School(db.Model):
  __tablename__ = "schools"
  id = db.Column(db.Integer, primary_key=True)
  name = db.Column(db.String(255), unique=True)
  students = db.relationship('Student', backref='school', lazy='dynamic')
  teachers = db.relationship('Teacher', lazy='dynamic')
  # teacher_id = db.Column(db.Integer, db.ForeignKey("teachers.id"))
  coordinator_joins = db.relationship('SchoolCoordinatorJoin', backref='ownedschool', lazy='dynamic')

  def __init__(self, name):
    self.name = name

  def get_defaults(self):
    return {
      'id': self.id,
      'name': self.name
    }

  def get_teachers_names(self):
    teachers = db.session.query(Teacher).filter_by(school_id=self.id).all()
    names = [t.name for t in teachers]
    if not names:
      return None
    else:
      return names

  def get_teacher_count(self):
    return db.session.query(Teacher).filter_by(school_id=self.id).count()

  def get_student_count(self):
    return db.session.query(Student).filter_by(school_id=self.id).count()

  def update_all(self, name):
    self.name = name

class SchoolCoordinatorJoin(db.Model):
  __tablename__ = 'school_coordinator_join'
  id = db.Column(db.Integer, primary_key=True)
  school_id = db.Column(db.Integer, db.ForeignKey("schools.id", ondelete='CASCADE'))
  user_id = db.Column(db.Integer, db.ForeignKey("user.id", ondelete='CASCADE'))

  def __init__(self, school_id, user_id):
    self.school_id = school_id
    self.user_id = user_id

class Survey(db.Model):
  __tablename__ = "surveys"
  id = db.Column(db.Integer, primary_key=True)
  is_mentoring_survey = db.Column(db.Boolean)
  name = db.Column(db.String(255))
  survey_questions = db.relationship('SurveyQuestion', backref='survey', lazy='dynamic')
  survey_responses = db.relationship('SurveyResponse', backref='source_survey', lazy='dynamic')
  field_trip_id = db.Column(db.Integer, db.ForeignKey("field_trips.id"))

  def __init__(self, name, is_mentoring_survey, field_trip_id=None):
    self.name = name
    self.is_mentoring_survey = is_mentoring_survey
    self.field_trip_id = field_trip_id

  def get_defaults(self):
    return {'id': self.id,
        'name': self.name,
        'field_trip_id': str(self.field_trip_id),
        'is_mentoring': str(self.is_mentoring_survey)
        }

  def update_all(self, name, is_mentoring, field_trip_id):
    self.name = name
    self.is_mentoring_survey = is_mentoring
    self.field_trip_id = field_trip_id

class Question(db.Model):
  __tablename__ = "questions"
  id = db.Column(db.Integer, primary_key=True)
  text = db.Column(db.String(255))
  question_type = db.Column(db.String(255))
  survey_questions = db.relationship('SurveyQuestion', backref='source_question', lazy='dynamic')

  def __init__(self, text, type):
    self.text = text
    self.question_type = type

  def get_defaults(self):
    return {
      'id': self.id,
      'text': self.text,
      'type': self.question_type
    }

  def update_all(self, text):
    self.text = text

  def get_question_text(self):
    return self.text

class SurveyQuestion(db.Model):
  __tablename__ = "survey_questions"
  id = db.Column(db.Integer, primary_key=True)
  survey_id = db.Column(db.Integer, db.ForeignKey("surveys.id", ondelete='CASCADE'))
  question_id = db.Column(db.Integer, db.ForeignKey("questions.id"))
  answer_choices = db.relationship('SurveyQuestionAnswer', backref='owning_question', lazy='dynamic')
  metric_id = db.Column(db.Integer, db.ForeignKey("metrics.id"))

  def __init__(self, survey_id, question_id):
    self.survey_id = survey_id
    self.question_id = question_id

  def get_question_text(self):
    question = db.session.query(Question).filter_by(id=self.question_id).first()
    return question.text

  def get_answer_choices(self):
    answers = db.session.query(SurveyQuestionAnswer).filter_by(question_id=self.id).all()
    return answers

  def get_identifying_name(self):
    string = "survey_%d_question_%d_%s" % (self.survey_id, self.question_id, self.source_question.question_type)
    return string

  def get_question_type(self):
    return self.source_question.question_type

class SurveyQuestionAnswer(db.Model):
  __tablename__ = "survey_question_answers"
  id = db.Column(db.Integer, primary_key=True)
  question_id = db.Column(db.Integer, db.ForeignKey("survey_questions.id", ondelete='CASCADE'))
  preset_answer_id = db.Column(db.Integer, db.ForeignKey("preset_answers.id"))
  answers = db.relationship('Answer', backref='answer_choice', lazy='dynamic')

  def __init__(self, question_id, preset_answer_id):
    self.question_id = question_id
    self.preset_answer_id = preset_answer_id

  def get_answer_value(self):
    return self.source_answer.answer_value

  def get_answer_text(self):
    return self.source_answer.answer_text


class PresetAnswer(db.Model):
  __tablename__ = "preset_answers"
  id = db.Column(db.Integer, primary_key=True)
  answer_choices = db.relationship('SurveyQuestionAnswer', backref='source_answer', lazy='dynamic')
  answer_text = db.Column(db.String(255))
  answer_value = db.Column(db.Integer)

  def __init__(self, answer_text, answer_value):
    self.answer_text = answer_text
    self.answer_value = answer_value

  # function to enable editing
  def get_defaults(self):
    return {
      'id': self.id,
      'answer_text': self.answer_text,
      'answer_value': self.answer_value,
    }

  def update_all(self, text, value):
    self.answer_text = text
    self.answer_value = value

class Answer(db.Model):
  __tablename__ = "answers"
  id = db.Column(db.Integer, primary_key=True)
  survey_question_answer_id = db.Column(db.Integer, db.ForeignKey("survey_question_answers.id", ondelete='CASCADE'))
  student_id = db.Column(db.Integer, db.ForeignKey("students.id"))
  question_id = db.Column(db.Integer, db.ForeignKey("questions.id", ondelete='CASCADE'))
  survey_response_id = db.Column(db.Integer, db.ForeignKey("survey_responses.id", ondelete='CASCADE'))
  other_answer_text = db.Column(db.Text())

  def __init__(self, student_id, question_id, survey_response_id, survey_question_answer_id=None, other_answer_text=None):
    self.survey_question_answer_id = survey_question_answer_id
    self.sqa = db.session.query(SurveyQuestionAnswer).filter_by(id=self.survey_question_answer_id).first() if self.survey_question_answer_id else None
    self.survey_response_id = survey_response_id
    self.student_id = student_id
    self.question_id = question_id
    self.other_answer_text = other_answer_text

  def get_question_text(self):
    sq = db.session.query(Question).filter_by(id=self.question_id).first()
    return sq.get_question_text()

  def get_answer_text(self):
    if self.survey_question_answer_id:
      sqa = db.session.query(SurveyQuestionAnswer).filter_by(id=self.survey_question_answer_id).first()
      pa = db.session.query(PresetAnswer).filter_by(id=sqa.preset_answer_id).first()
      return pa.answer_text
    else:
      return self.other_answer_text

  def get_answer_value(self):
    if self.survey_question_answer_id:
      sqa = db.session.query(SurveyQuestionAnswer).filter_by(id=self.survey_question_answer_id).first()
      pa = db.session.query(PresetAnswer).filter_by(id=sqa.preset_answer_id).first()
      return pa.answer_value
    else:
      return None

  def get_question_metric(self):
    sqa = db.session.query(SurveyQuestionAnswer).filter_by(id=self.survey_question_answer_id).first()
    if not sqa:
      return None
    sq = db.session.query(SurveyQuestion).filter_by(id=sqa.question_id).first()
    metric_id = sq.metric_id
    metric = db.session.query(Metric).filter_by(id=metric_id).first()
    if metric:
      return metric.name
    else:
      return None

  def get_question_type(self):
    question = db.session.query(Question).filter_by(id=self.question_id).first()
    return question.question_type

class SurveyResponse(db.Model):
  __tablename__ = "survey_responses"
  id = db.Column(db.Integer, primary_key=True)
  student_id = db.Column(db.Integer, db.ForeignKey("students.id", ondelete='CASCADE'))
  date = db.Column(db.DateTime)
  answers = db.relationship("Answer", backref="owning_survey", lazy='dynamic')
  survey_id = db.Column(db.Integer, db.ForeignKey("surveys.id", ondelete='CASCADE'))

  def __init__(self, survey_id, student_id):
    self.survey_id = survey_id
    self.student_id = student_id
    self.date = datetime.datetime.now()

  def calculate_metrics(self):
    obj = {}
    metrics = db.session.query(Metric).all()
    for metric in metrics:
      answers = db.session.query(Answer).filter(Answer.survey_response_id == self.id).all()
      answers = [a for a in answers if a.get_question_metric() == metric.name]
      values = [a.get_answer_value() for a in answers]
      if len(values) == 0:
        obj[metric.name] = {
          "average": 0,
          "data": []
        }
      else:
        text_val_pairs = [(a.get_answer_text(), a.get_answer_value()) for a in answers]
        nvalues = []
        for v in values:
          nvalues.append(v) if v is not None else nvalues.append(0)
        average = float(sum(nvalues))/len(nvalues)
        obj[metric.name] = {
          "average": average,
          "data": []
        }
        for text, val in text_val_pairs:
          obj[metric.name]["data"].append((text, val))
    return obj

  def serialize_without_metrics(self):
    obj = {}
    obj['id'] = self.id
    answers = db.session.query(Answer).filter(Answer.survey_response_id==self.id).all() #get all answers related to this particular survey response
    for answer in answers:
      obj[answer.get_question_text()] = answer.get_answer_text()
    return obj

  def serialize_with_metrics(self): 
    obj = self.serialize_without_metrics()
    obj['metrics'] = self.calculate_metrics()
    return obj

class Metric(db.Model):
  __tablename__ = "metrics"
  id = db.Column(db.Integer, primary_key=True)
  name = db.Column(db.String(255))
  questions = db.relationship("SurveyQuestion", backref="metric", lazy="dynamic")

  def __init__(self, name):
    self.name = name

  def calculate_survey_average(self, survey_id):
    responses = db.session.query(SurveyResponse).filter(SurveyResponse.survey_id == survey_id).all()
    if len(responses) == 0:
      return 0.0
    else:
      averages = [self.calculate_response_average(response.id) for response in responses]
      return round(float(sum(averages))/len(responses), 1)

  def calculate_response_average(self, survey_response_id):
    answers = db.session.query(Answer).filter(Answer.survey_response_id == survey_response_id).all()
    answers = [a for a in answers if a.get_question_metric() == self.name]
    values = [a.get_answer_value() for a in answers]
    if len(values) == 0:
      return 0.0
    else:
      return float(sum(values))/len(values)