HOME/Articles/

pil example mjpgserver class (snippet)

Article Outline

Python pil example 'mjpgserver class'

Functions in program:

  • def pull():
  • def pull():
  • def pull():

Modules used in program:

  • import gevent

python mjpgserver class

Python pil example: mjpgserver class

#!/usr/bin/python3.4
"""

Made by: David Smerkous - [email protected]
A simple(class) mjpegserver written in python, for ease of use
on any video streaming application or testing

License: GPL3+
Python version: 3.4+ (Could easily be transferred to <3.4 with some changes)

Original post/idea: https://gist.github.com/n3wtron/4624820
Made for: I decided to share this because maybe another FRC
team that needs a python versionized mjpeg streamer for their robot... In
short just an idea thrown together by me for team 5431: https://github.com/frc5431

"""

from __future__ import print_function

from urllib.parse import parse_qs

from gevent import monkey
import gevent

monkey.patch_socket()
monkey.patch_dns()
monkey.patch_time()
monkey.patch_subprocess()

from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn
from time import sleep, time
from io import BytesIO
from threading import Thread

try:
    import cv2
except ImportError:
    pass


# noinspection PyBroadException
class MjpgServer(BaseHTTPRequestHandler):
    def __init__(self, bind_addr='localhost', port=8080, name='cam'):
        class THTTPS(ThreadingMixIn, HTTPServer):
            pass

        self._h_server = THTTPS((bind_addr, port), super().__init__)
        self._name = str(name)
        self._port = port
        self._bind = bind_addr
        self._ext_types = ('.mjpg', '.html', '.jpeg', 'fps', 'quality')
        self._methods = ('open', 'pil', 'raw')
        self._cur_method = 'open'
        self._pull_method = None
        self._fps = -1
        self._real_fps = 0
        self._measure_fps = False
        self._cur_size = 1
        self._quality = -1

    def set_fps(self, fps=-1):
        self._fps = int(fps)

    def set_quality(self, quality=-1):
        self._quality = quality

    def get_fps(self):
        self._measure_fps = True
        return self._real_fps

    def get_band_usage(self):
        return ((self._cur_size / 1024) / 1024) * self.get_fps()

    def attach_opencv(self, method):
        self._pull_method = method
        self._cur_method = self._methods[0]

    def attach_pil(self, method):
        self._pull_method = method
        self._cur_method = self._methods[1]

    def attach_raw(self, method):
        self._pull_method = method
        self._cur_method = self._methods[2]

    def start(self, threaded=False):
        if self._pull_method is None:
            raise RuntimeError('Request method not found')

        print("Starting mjpeg server http://%s:%d/%s.html" % (self._bind, self._port, self._name))
        if not threaded:
            self._h_server.serve_forever()
            return None
        else:
            mjpg_thread = Thread(target=self._h_server.serve_forever)
            mjpg_thread.setDaemon(True)
            mjpg_thread.start()
            return mjpg_thread

    def __write(self, message):
        if isinstance(message, str):
            self.wfile.write(message.encode('utf8'))
        else:
            self.wfile.write(message)

    def __get_cv_encoded(self):
        ok, img = self._pull_method()
        if not ok:
            return None
        rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        if self._quality > -1:
            im_enc = cv2.imencode('.jpeg', rgb, [cv2.IMWRITE_JPEG_QUALITY, self._quality])[1]
        else:
            im_enc = cv2.imencode('.jpeg', rgb)[1]
        enc = bytearray(im_enc)
        return enc

    def __get_pil_encoded(self):
        ok, img = self._pull_method()
        if not ok:
            return None
        out_stream = BytesIO()
        if self._quality > -1:
            img.save(out_stream, 'JPEG', quality=self._quality)
        else:
            img.save(out_stream, 'JPEG')
        return out_stream.getvalue()

    def __get_raw_encoded(self):
        ok, img = self._pull_method()
        if not ok:
            return None
        out_stream = BytesIO()
        if isinstance(img, str):
            out_stream.write(img.encode('utf8'))
        elif isinstance(img, (bytearray, bytes)):
            out_stream.write(img)
        return out_stream.getvalue()

    def __sel_enc(self):
        if self._cur_method == self._methods[0]:
            enc = self.__get_cv_encoded()
        elif self._cur_method == self._methods[1]:
            enc = self.__get_pil_encoded()
        else:
            enc = self.__get_raw_encoded()
        return enc

    def __handle_mjpeg(self):
        global cap
        try:
            self.send_response(200)
            self.send_header('Content-type', 'multipart/x-mixed-replace; boundary=--jpgboundary')
            self.end_headers()
        except Exception:
            return
        stime = time()
        cfps_time = time()
        count_to_fps = 0
        while 1:
            if self._fps != -1:
                ref_time = 0.95 / self._fps
                if (time() - stime) >= ref_time:
                    stime = time() - 0.1
                else:
                    gevent.sleep(ref_time / 1.1)
            try:
                enc = self.__sel_enc()
                if not enc:
                    continue
                if self._measure_fps:
                    count_to_fps += 1
                    if count_to_fps >= self._fps:
                        t_to_fps = time() - cfps_time
                        self._real_fps = 1.0 / t_to_fps
                        cfps_time = time()
                        count_to_fps = 0
                self.wfile.write(b'--jpgboundary')
                self.send_header('Content-type', 'image/jpeg')
                self._cur_size = len(enc)
                self.send_header('Content-length', self._cur_size)
                self.end_headers()
                self.wfile.write(enc)
            except Exception:
                break

    def __handle_html(self):
        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.end_headers()
        self.wfile.write(("<html><head></head><body><img src=\"http://127.0.0.1:%d/%s.mjpg\"/></body></html>" %
                          (self._port, self._name)).encode('utf8'))

    def __handle_jpeg(self):
        enc = self.__sel_enc()
        if enc is None:
            self.send_error(505, 'Error parsing image')
        self.send_response(200)
        self.send_header('Content-type', 'image/jpeg')
        self.send_header('Content-length', len(enc))
        self.end_headers()
        self.wfile.write(enc)

    def do_POST(self):
        """
        Do a post request to the server to
        set settings such as quality and fps
        """
        length = int(self.headers['Content-Length'])

        post_data = parse_qs(self.rfile.read(length).decode('utf-8'))
        for key, value in post_data.items():
            key = key.lower()
            if key == 'fps':
                self._fps = int(value[0])
                print("Setting new fps: %s" % self._fps)
            elif key == 'quality':
                self._quality = int(value[0])
                print("Setting new quality: %s" % self._quality)
        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.end_headers()

    def do_GET(self):
        ret_path = self.path

        if ret_path.endswith(self._name + self._ext_types[0]):
            self.__handle_mjpeg()
        elif ret_path.endswith(self._name + self._ext_types[1]) or self.path == "/":
            self.__handle_html()
        elif ret_path.endswith(self._name + self._ext_types[2]):
            self.__handle_jpeg()
        elif ret_path.endswith(self._ext_types[3]):
            self.send_response(200)
            self.send_header('Content-type', 'text/plain')
            self.end_headers()
            self.wfile.write(("%d" % self._fps).encode('utf8'))
        elif ret_path.endswith(self._ext_types[4]):
            self.send_response(200)
            self.send_header('Content-type', 'text/plain')
            self.end_headers()
            self.wfile.write(("%d" % self._quality).encode('utf8'))
        else:
            self.send_error(404, "This server only accepts %s(%s, %s, %s)" % (self._name, self._ext_types[0],
                                                                              self._ext_types[1],
                                                                              self._ext_types[2]))
        return

'''
Opencv capture example
'''
cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080)
cap.set(cv2.CAP_PROP_SATURATION, 0.5)


def pull():
    global cap
    return cap.read()

'''
PIL
arr = '\x30\x12...'

def pull():
   img = Image.fromarray(arr)
   ok = img is not None
   return ok, img
'''

'''
RAW (Byte array or string)

def pull():
    img = bytearray(arr)
    ok = False
    if len(img) > 0:
        ok = True
    return ok, img

'''

if __name__ == '__main__':
    mjpg_server = MjpgServer(port=8080, name="cam")
    mjpg_server.attach_opencv(pull)
    mjpg_server.set_quality(50)
    mjpg_server.start(threaded=True)  # Remove if you want it to be the only thing running

    # While loop to eat the space away
    while True:
        print("FPS: %d" % mjpg_server.get_fps())  # Get 'real' fps (Calculated pulling)
        print("BAND(B): %.2f" % (mjpg_server.get_band_usage()))  # Get calculated bandwith (p.s. it gets
                                                                 # image size and real fps to calc)
        sleep(5)