HOME/Articles/

osc

Article Outline

Example Python program osc.py This program creates a PyQt GUI Python version 3.x or newer. To check the Python version use:

python --version

Modules

  • import sys
  • from PyQt5.QtCore import (
  • from PyQt5.QtWidgets import (
  • from pythonosc import (
  • import threading

Classes

  • class MainWindow(QMainWindow):
  • class SendWidget(QWidget):
  • class ReceiveWidget(QWidget):
  • class Communicate(QObject):

Methods

  • def init(self):
  • def initUI(self):
  • def closeEvent(self, event):
  • def init(self):
  • def initUI(self):
  • def send(self):
  • def init(self):
  • def initUI(self):
  • def receive(self):
  • def handleReceivedMessage(addr, data):

Code

Example Python PyQt program :

import sys

from PyQt5.QtCore import (
    pyqtSignal,
    QObject
)

from PyQt5.QtWidgets import (
    QMainWindow,
    QWidget,
    QPushButton,
    QLabel,
    QLineEdit,
    QTextEdit,
    QVBoxLayout,
    QHBoxLayout,
    QGridLayout,
    QApplication
)

from pythonosc import (
    osc_message_builder,
    udp_client,
    dispatcher,
    osc_server
)

import threading

class MainWindow(QMainWindow):

    def __init__(self):
        super().__init__()
        self.initUI()

    def initUI(self):
        self.sendWidget = SendWidget()
        self.receiveWidget = ReceiveWidget()

        layout = QVBoxLayout()
        layout.setSpacing(20)
        layout.addWidget(self.sendWidget)
        layout.addWidget(self.receiveWidget)

        centralWidget = QWidget()
        centralWidget.setLayout(layout)
        self.setCentralWidget(centralWidget)

        self.setGeometry(100, 100, 250, 300)
        self.setWindowTitle("OSC: SEND / RECEIVE")
        self.show()

    def closeEvent(self, event):
        print("closing")
        if self.receiveWidget.serverRunning:
            print("ReceiveWidget, deleting thread")
            self.receiveWidget.server.shutdown()
            self.receiveWidget.server.server_close()
            self.receiveWidget.serverRunning = False


class SendWidget(QWidget):

    def __init__(self):
        super().__init__()
        self.initUI()

    def initUI(self):
        labelIP = QLabel("SEND IP", self)
        self.IPEdit = QLineEdit()
        self.IPEdit.setText("127.0.0.1")
        labelPort = QLabel("SEND PORT", self)
        self.portEdit = QLineEdit()
        self.portEdit.setText("8000")
        labelAddress = QLabel("SEND ADDRESS", self)
        self.addressEdit = QLineEdit()
        self.addressEdit.setText("debug")
        labelData = QLabel("SEND DATA")
        self.dataEdit = QLineEdit()
        self.dataEdit.setText("1")
        button = QPushButton("SEND", self)

        button.clicked.connect(self.send)

        grid = QGridLayout();
        grid.setColumnMinimumWidth(0, 100)

        grid.addWidget(labelIP, 1, 0)
        grid.addWidget(self.IPEdit, 1, 1)

        grid.addWidget(labelPort, 2, 0)
        grid.addWidget(self.portEdit, 2, 1)

        grid.addWidget(labelAddress, 3, 0)
        grid.addWidget(self.addressEdit, 3, 1)

        grid.addWidget(labelData, 4, 0)
        grid.addWidget(self.dataEdit, 4, 1)

        grid.addWidget(button, 5, 1)

        self.setLayout(grid)
        self.show()

    def send(self):
        if self.IPEdit.text() == "" or self.portEdit.text() == "" or self.addressEdit.text() == "":
            print("ip, port or address empty")
            return

        client = udp_client.UDPClient(self.IPEdit.text(), int(self.portEdit.text()))
        msg = osc_message_builder.OscMessageBuilder(address = "/" + self.addressEdit.text())
        msg.add_arg(self.dataEdit.text())
        msg = msg.build()
        client.send(msg)


class ReceiveWidget(QWidget):

    serverRunning = False
    runningIP = ""
    runningPort = 0

    def __init__(self):
        super().__init__()
        self.initUI()

    def initUI(self):
        labelIP = QLabel("RECEIVE IP", self)
        self.IPEdit = QLineEdit()
        self.IPEdit.setText("127.0.0.1")
        labelPort = QLabel("RECEIVE PORT", self)
        self.portEdit = QLineEdit()
        self.portEdit.setText("8000")
        button = QPushButton("RECEIVE", self)

        grid = QGridLayout()
        grid.setColumnMinimumWidth(0, 100)
        grid.addWidget(labelIP, 0, 0)
        grid.addWidget(self.IPEdit, 0, 1)
        grid.addWidget(labelPort, 1, 0)
        grid.addWidget(self.portEdit, 1, 1)
        grid.addWidget(button, 2, 1)

        labelAddress = QLabel("ADDRESS", self)
        self.receivedAddress = QTextEdit()
        addressBox = QVBoxLayout()
        addressBox.addWidget(labelAddress)
        addressBox.addWidget(self.receivedAddress)

        labelData = QLabel("DATA")
        self.receivedData = QTextEdit()
        dataBox = QVBoxLayout()
        dataBox.addWidget(labelData)
        dataBox.addWidget(self.receivedData)

        receiveBox = QHBoxLayout()
        receiveBox.addLayout(addressBox)
        receiveBox.addLayout(dataBox)

        box = QVBoxLayout()
        box.addLayout(grid)
        box.addLayout(receiveBox)
        self.setLayout(box)

        button.clicked.connect(self.receive)
        self.show()

    def receive(self):
        if self.serverRunning:
            print("server already running")
            self.server.shutdown()
            self.server.server_close()

        ip = self.IPEdit.text()
        port = int(self.portEdit.text())

        class Communicate(QObject):
            receive = pyqtSignal(object, object)

        def handleReceivedMessage(addr, data):
            print(addr, ":", data)
            self.receivedAddress.append(addr)
            self.receivedData.append(data)

        self.c = Communicate()
        self.c.receive.connect(handleReceivedMessage)

        myDispatcher = dispatcher.Dispatcher()
        # myDispatcher.set_default_handler(handleReceivedMessage)
        myDispatcher.set_default_handler(self.c.receive.emit)

        self.server = osc_server.ThreadingOSCUDPServer((ip, port), myDispatcher)
        server_thread = threading.Thread(target=self.server.serve_forever)
        server_thread.start()
        print("thread start")

        self.serverRunning = True
        self.runningIP = ip
        self.runningPort = port


if __name__ == "__main__":
    app = QApplication(sys.argv)
    win = MainWindow()
    sys.exit(app.exec_())