HOME/Articles/

socket example websocket test (snippet)

Article Outline

Python socket example 'websocket test'

Functions in program:

  • def on_open(ws):
  • def on_message(ws, message):
  • def on_close(ws):
  • def on_error(ws, error):

Modules used in program:

  • import threading, time, random
  • import sys
  • import time
  • import thread
  • import ssl
  • import websocket

python websocket test

Python socket example: websocket test

from __future__ import print_function
import websocket
import ssl
import thread
import time
import sys

import threading, time, random
count = 0
class Counter(threading.Thread):
    def __init__(self, lock, threadName,ws):
        super(Counter, self).__init__(name = threadName) 
        self.lock = lock
        self.ws = ws
    def run(self):
        global count
        self.lock.acquire()
        opt = {"cert_reqs": ssl.CERT_NONE}
        ws.run_forever(sslopt = opt)
        for i in xrange(10000):
            count = count + 1
        self.lock.release()
lock = threading.Lock()

def on_error(ws, error):
    print((error))
def on_close(ws):
    print(("### closed ###"))
def on_message(ws, message):
    print((message))
def on_open(ws):
    def run(*args):
        for i in range(1000):
            time.sleep(1)
            ws.send("Hello %d" % i)
        time.sleep(1)
        ws.close()
        print(("thread terminating..."))
    thread.start_new_thread(run, ())
if __name__ == "__main__":
    websocket.enableTrace(True)
    opt = {"cert_reqs": ssl.CERT_NONE}
    print("abd")
    for i in range(5): 
        ws = websocket.WebSocketApp("wss://server/ws/ws/",
                header=["appKey: key","appSecret:secret",
            "deviceToken:abc"],
            on_message = on_message,
                              on_error = on_error,
                              on_close = on_close)
        ws.on_open = on_open
        Counter(lock, "thread-" + str(i),ws).start()
        time.sleep(2)
    print(count)