#!/usr/bin/python2 # -*- coding: utf-8 -*- # sudo pip install tornado # # Websocket example with tornade web server. # # Gerhard Hepp, 2017 # import tornado.ioloop import tornado.web import tornado.websocket import threading import time import Queue # debug = True: set for a blink signal, no GPIO usage # debug = False: set for GPIO usage debug = False if not debug: import RPi.GPIO as GPIO # messages from Periphaeral Class to Websocket sendQueue= Queue.Queue() class PeripheralDebug: def __init__(self, sendQueue): self.sendQueue = sendQueue def start(self): self.runit = True blinkThread = threading.Thread(target=self.blink) blinkThread.start() def stop(self): self.runit = False def blink(self): cnt = 0 while self.runit: self.sendQueue.put( { 'data': 'on', 'cnt' : cnt } ) cnt += 1 time.sleep(0.5) self.sendQueue.put( { 'data': 'off', 'cnt' : cnt } ) cnt += 1 time.sleep(0.5) class PeripheralGPIO: def __init__(self, sendQueue): self.sendQueue = sendQueue GPIO.setmode(GPIO.BCM) GPIO.setwarnings(False) self.channel = 4 GPIO.setup(self.channel, GPIO.IN, pull_up_down=GPIO.PUD_UP) def start(self): self.runit = True blinkThread = threading.Thread(target=self.run) blinkThread.start() def stop(self): self.runit = False def run(self): cnt = 0 prev = None while self.runit: res = GPIO.input(self.channel) if prev != res: if res: self.sendQueue.put( { 'data': 'on', 'cnt' : cnt } ) else: self.sendQueue.put( { 'data': 'off', 'cnt' : cnt } ) cnt += 1 prev = res # for debouncing and limiting number of events per time time.sleep(0.01) class PeripheralFactory: @staticmethod def getPeripheral(debug): if debug: return PeripheralDebug(sendQueue) else: return PeripheralGPIO(sendQueue) peripheral = PeripheralFactory.getPeripheral(debug) peripheral.start() class MainHandler(tornado.web.RequestHandler): def get(self): self.write( """