260 lines
7.7 KiB
Python
Executable File
260 lines
7.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Very simple HTTP server in python for logging requests
|
|
Usage::
|
|
./server.py [<port>]
|
|
"""
|
|
import RPi.GPIO as GPIO
|
|
import http.server
|
|
import socketserver
|
|
import logging
|
|
import sys
|
|
import os
|
|
import subprocess
|
|
import cgi
|
|
import time
|
|
import json
|
|
import base64
|
|
|
|
fileObject = open("config.json", "r")
|
|
jsonContent = fileObject.read()
|
|
config = json.loads(jsonContent)
|
|
print(config)
|
|
|
|
GPIO.setwarnings(False)
|
|
os.system('pkill -9 httpd')
|
|
GPIO.setmode(GPIO.BCM)
|
|
pinList = [2, 3, 4, 17]
|
|
SleepTimeL=10
|
|
|
|
|
|
#https://forums.raspberrypi.com/viewtopic.php?t=24389
|
|
#
|
|
#YW5vdGhlcjptZQ==
|
|
#another:me
|
|
#
|
|
#admin:123456
|
|
#YWRtaW46MTIzNDU2
|
|
class S(http.server.SimpleHTTPRequestHandler):
|
|
|
|
|
|
def do_HEAD(self):
|
|
self.send_response(200, "OK")
|
|
self.send_header('Content-type', 'text/html')
|
|
self.end_headers()
|
|
|
|
def do_AUTHHEAD(self):
|
|
self.send_response(401)
|
|
self.send_header('WWW-Authenticate', 'Basic realm=\"Test\"')
|
|
self.send_header('Content-type', 'text/html')
|
|
self.end_headers()
|
|
|
|
def do_GET(self):
|
|
|
|
if self.headers['Authorization'] == None:
|
|
self.do_AUTHHEAD()
|
|
#self.wfile.write(bytes('no auth header received', 'UTF-8'))
|
|
pass
|
|
elif self.headers['Authorization'] == 'Basic YWRtaW46MTIzNDU2':
|
|
self.do_HEAD()
|
|
#self.wfile.write(bytes(self.headers['Authorization'], 'UTF-8'))
|
|
#self.wfile.write(bytes(' authenticated!', 'UTF-8'))
|
|
GPIO.setup(pinList[0], GPIO.OUT)
|
|
status0=int(GPIO.input(pinList[0]))
|
|
GPIO.setup(pinList[1], GPIO.OUT)
|
|
status1=int(GPIO.input(pinList[1]))
|
|
GPIO.setup(pinList[2], GPIO.OUT)
|
|
status2=int(GPIO.input(pinList[2]))
|
|
GPIO.setup(pinList[3], GPIO.OUT)
|
|
status3=int(GPIO.input(pinList[3]))
|
|
#logging.info("GET request,\nPath: %s\nHeaders:\n%s\n", str(self.path), str(self.headers))
|
|
#self.send_response(200, "OK")
|
|
#self.end_headers()
|
|
self._set_response()
|
|
pass
|
|
else:
|
|
self.do_AUTHHEAD()
|
|
self.wfile.write(bytes(self.headers['Authorization'], 'UTF-8'))
|
|
self.wfile.write(bytes(' not authenticated', 'UTF-8'))
|
|
logging.info('Someone wanted to get access without authorization')
|
|
file = read_html_template("login.html")
|
|
self.wfile.write(bytes(file.encode("utf-8")))
|
|
pass
|
|
|
|
|
|
|
|
def do_POST(self):
|
|
content_length = int(self.headers['Content-Length']) # <--- Gets the size of data
|
|
post_data = self.rfile.read(content_length) # <--- Gets the data itself
|
|
#logging.info("POST request,\nPath: %s\nHeaders:\n%s\n\nBody:\n%s\n",
|
|
#str(self.path), str(self.headers), post_data.decode('utf-8'))
|
|
#self._set_response()
|
|
|
|
|
|
|
|
def _set_response(self):
|
|
|
|
fileObject = open("config.json", "r")
|
|
jsonContent = fileObject.read()
|
|
config = json.loads(jsonContent)
|
|
print(config)
|
|
|
|
#time.sleep(5)
|
|
GPIO.setup(pinList[0], GPIO.OUT)
|
|
status0=int(GPIO.input(pinList[0]))
|
|
GPIO.setup(pinList[1], GPIO.OUT)
|
|
status1=int(GPIO.input(pinList[1]))
|
|
GPIO.setup(pinList[2], GPIO.OUT)
|
|
status2=int(GPIO.input(pinList[2]))
|
|
GPIO.setup(pinList[3], GPIO.OUT)
|
|
status3=int(GPIO.input(pinList[3]))
|
|
|
|
objParsedPath = str(format(self.path).encode('utf-8'))
|
|
#print ('objParsedPath="'+objParsedPath+'"')
|
|
|
|
if objParsedPath == "b'/R0'":
|
|
print ('R0 run')
|
|
if status0 == 1:
|
|
os.system('./relay.py open 0')
|
|
else:
|
|
os.system('./relay.py close 0')
|
|
time.sleep(int(SleepTimeL));
|
|
os.system('./relay.py open 0')
|
|
|
|
if objParsedPath == "b'/R0O'":
|
|
print ('R0 run0')
|
|
if status0 == 1:
|
|
os.system('./relay.py open 0')
|
|
else:
|
|
os.system('./relay.py close 0')
|
|
|
|
|
|
|
|
if objParsedPath == "b'/R1'":
|
|
print ('R1 run')
|
|
if status1 == 1:
|
|
os.system('./relay.py open 1')
|
|
else:
|
|
os.system('./relay.py close 1')
|
|
time.sleep(int(SleepTimeL));
|
|
os.system('./relay.py open 1')
|
|
|
|
if objParsedPath == "b'/R1O'":
|
|
print ('R1 run0')
|
|
if status1 == 1:
|
|
os.system('./relay.py open 1')
|
|
else:
|
|
os.system('./relay.py close 1')
|
|
|
|
|
|
if objParsedPath == "b'/R2'":
|
|
print ('R2 run')
|
|
if status2 == 1:
|
|
os.system('./relay.py open 2')
|
|
else:
|
|
os.system('./relay.py close 2')
|
|
time.sleep(int(SleepTimeL));
|
|
os.system('./relay.py open 2')
|
|
|
|
if objParsedPath == "b'/R2O'":
|
|
print ('R2 run0')
|
|
if status2 == 1:
|
|
os.system('./relay.py open 2')
|
|
else:
|
|
os.system('./relay.py close 2')
|
|
|
|
|
|
if objParsedPath == "b'/R3'":
|
|
print ('R3 run')
|
|
if status3 == 1:
|
|
os.system('./relay.py open 3')
|
|
else:
|
|
os.system('./relay.py close 3')
|
|
time.sleep(int(SleepTimeL));
|
|
os.system('./relay.py open 3')
|
|
|
|
if objParsedPath == "b'/R3O'":
|
|
print ('R3 run0')
|
|
if status3 == 1:
|
|
os.system('./relay.py open 3')
|
|
else:
|
|
os.system('./relay.py close 3')
|
|
|
|
GPIO.setup(pinList[0], GPIO.OUT)
|
|
status0=int(GPIO.input(pinList[0]))
|
|
GPIO.setup(pinList[1], GPIO.OUT)
|
|
status1=int(GPIO.input(pinList[1]))
|
|
GPIO.setup(pinList[2], GPIO.OUT)
|
|
status2=int(GPIO.input(pinList[2]))
|
|
GPIO.setup(pinList[3], GPIO.OUT)
|
|
status3=int(GPIO.input(pinList[3]))
|
|
|
|
|
|
|
|
file = read_html_template("index.html")
|
|
self.send_response(200, "OK")
|
|
self.end_headers()
|
|
#replace config in index.html
|
|
file = file.replace("DESIGN1",config['com1'])
|
|
file = file.replace("DESIGN2",config['com2'])
|
|
file = file.replace("DESIGN3",config['com3'])
|
|
file = file.replace("DESIGN4",config['com4'])
|
|
|
|
file = file.replace("URL_HTTP",config['url'])
|
|
file = file.replace("PORT_HTTP",str(config['port']))
|
|
|
|
print ('Relay0 = '+str(status0))
|
|
if status0 == 0:
|
|
file = file.replace("CHECKED1", "checked")
|
|
else:
|
|
file = file.replace("CHECKED1", "")
|
|
|
|
print ('Relay1 = '+str(status1))
|
|
if status1 == 0:
|
|
file = file.replace("CHECKED2", "checked")
|
|
else:
|
|
file = file.replace("CHECKED2", "")
|
|
|
|
print ('Relay2 = '+str(status2))
|
|
if status2 == 0:
|
|
file = file.replace("CHECKED3", "checked")
|
|
else:
|
|
file = file.replace("CHECKED3", "")
|
|
|
|
print ('Relay3 = '+str(status3))
|
|
if status3 == 0:
|
|
file = file.replace("CHECKED4", "checked")
|
|
else:
|
|
file = file.replace("CHECKED4", "")
|
|
|
|
self.wfile.write(bytes(file.encode("utf-8")))
|
|
|
|
|
|
|
|
|
|
def read_html_template(path):
|
|
"""function to read HTML file"""
|
|
try:
|
|
with open(path) as f:
|
|
file = f.read()
|
|
except Exception as e:
|
|
file = e
|
|
return file
|
|
|
|
def run(handler_class=S, port=8000):
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
server_address = ('', port)
|
|
#httpd = server_class(server_address, handler_class)
|
|
logging.info('Starting httpd '+str(port)+' ...\n')
|
|
try:
|
|
Handler = http.server.SimpleHTTPRequestHandler
|
|
with socketserver.TCPServer(server_address, handler_class) as httpd:
|
|
httpd.serve_forever()
|
|
except KeyboardInterrupt:
|
|
pass
|
|
httpd.server_close()
|
|
logging.info('Stopping httpd...\n')
|
|
|
|
if __name__ == '__main__':
|
|
run(port=config['port']) |