HOME/Articles/

mysql example workshop (snippet)

Article Outline

Python mysql example 'workshop'

Modules used in program:

  • import logging
  • import re
  • import luigi

python workshop

Python mysql example: workshop

from edx.analytics.tasks.mysql_load import MysqlInsertTask

from edx.analytics.tasks.url import get_target_from_url, url_path_join, ExternalURL
from edx.analytics.tasks.pathutil import EventLogSelectionMixin
from edx.analytics.tasks.mapreduce import MapReduceJobTask
from edx.analytics.tasks.util import eventlog
import luigi
import re
from edx.analytics.tasks.util.record import Record, StringField, IntegerField
import logging

log = logging.getLogger(__name__)


class HelloWorldTask(luigi.Task):

    input_url = luigi.Parameter()
    output_url = luigi.Parameter()

    def requires(self):
        return ExternalURL(self.input_url)

    def run(self):
        with self.input().open(mode='r') as input_file:
            with self.output().open(mode='w') as output_file:
                for line in input_file:
                    output_file.write('Hello ' + line)

    def output(self):
        return get_target_from_url(self.output_url)


class HelloWorldMapReduceTask(EventLogSelectionMixin, MapReduceJobTask):

    output_root = luigi.Parameter()

    def mapper(self, line):
        value = self.get_event_and_date_string(line)
        if value is None:
            return
        event, _date_string = value

        key = 'hello ' + event.get('event_type', 'UNKNOWN')
        yield key, 1

    def reducer(self, key, values):
        yield key, sum(values)

    def output(self):
        return get_target_from_url(url_path_join(self.output_root, self.interval.to_string()) + '/')


class ViewDistribution(EventLogSelectionMixin, MapReduceJobTask):

    SUBSECTION_ACCESSED_PATTERN = r'/courses/.*?courseware/([^/]+)/([^/]+)/.*$'

    output_root = luigi.Parameter()

    def mapper(self, line):
        value = self.get_event_and_date_string(line)
        if value is None:
            return
        event, _date_string = value

        event_type = event.get('event_type')
        if event_type is None:
            return

        # Challenge #1
        # Use the SUBSECTION_ACCESSED_PATTERN to extract the section and subsection from the URL. The first match will
        # be the section and the second will be the subsection.
        m = re.match(self.SUBSECTION_ACCESSED_PATTERN, event_type)
        if not m:
            return
        section, subsection = m.group(1, 2)

        username = event.get('username', '').strip()
        if not username:
            return

        course_id = eventlog.get_course_id(event)
        if not course_id:
            return

        # Challenge #2
        # What should this method yield?
        yield (course_id, section, subsection), (username)

    def reducer(self, key, values):
        # Challenge #2 continued
        # I recommend assigning each element of your key to a variable here: foo, bar = key
        course_id, section, subsection = key

        # Challenge 3
        # Compute the total number of views for each (section, subsection) as well as the unique number of users who
        # viewed that subsection.
        unique_usernames = set()
        total_views = 0
        for username in values:
            unique_usernames.add(username)
            total_views += 1
        unique_user_views = len(unique_usernames)

        yield ViewRecord(
            course_id=course_id,
            section=section,
            subsection=subsection,
            unique_user_views=unique_user_views,
            total_views=total_views
        ).to_string_tuple()

    def output(self):
        return get_target_from_url(url_path_join(self.output_root, self.interval.to_string()) + '/')


class ViewRecord(Record):
    course_id = StringField(length=255, nullable=False)
    section = StringField(length=255, nullable=False)
    subsection = StringField(length=255, nullable=False)
    unique_user_views = IntegerField()
    total_views = IntegerField()


class ViewDistributionMysqlTask(MysqlInsertTask):

    output_root = luigi.Parameter()
    interval = luigi.DateIntervalParameter()
    n_reduce_tasks = luigi.IntParameter(significant=False)

    @property
    def table(self):
        return 'content_views'

    @property
    def columns(self):
        return ViewRecord.get_sql_schema()

    @property
    def indexes(self):
        return [
            ('course_id',)
        ]

    @property
    def insert_source_task(self):
        return ViewDistribution(
            interval=self.interval,
            output_root=self.output_root,
            n_reduce_tasks=self.n_reduce_tasks
        )