keyboard-hook/usr/bin/server.py

130 lines
3.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import logging
import sys
import os
from socket import *
import fcntl
import time
from multiprocessing import Process,Queue
fifo = '/tmp/process_fifo.txt'
# Setup simple logging
logging.basicConfig(level=logging.INFO)
NULL_CHAR = chr(0)
zero = NULL_CHAR*8
# Process1 logic
def process1(error_queue):
process1_logger = logging.getLogger('process1')
process1_logger.info(f"Pid:{os.getpid()}")
#''代表服务器为 localhost
myHost = ''
#在一个非保留端口号上进行监听
myPort = 50009
#设置一个TCP socket对象
sockobj = socket(AF_INET, SOCK_STREAM)
#绑定端口号
sockobj.bind((myHost, myPort))
#监听允许5个连结
sockobj.listen(5)
#直到进程结束时才结束循环
while True:
try:
#等待客户端连接
connection, address = sockobj.accept()
#连接是一个新的socket
print ('Server connected by', address)
while True:
#读取客户端套接字的下一行
data = connection.recv(1024)
#如果没有数量的话,那么跳出循环
if not data: break
if data == b"hid open":
os.popen('ls /sys/class/udc | xargs echo > /sys/kernel/config/usb_gadget/g1/UDC')
print("hid open")
elif data == b"hid close":
os.popen('echo > /sys/kernel/config/usb_gadget/g1/UDC')
print("hid close")
else:
print(data)
file = os.open(fifo, os.O_WRONLY)
os.write(file,data)
process1_logger.info(f"Writing: {data}")
os.close(file)
#发送一个回复至客户端
connection.send(b'server => ' + data)
#当socket关闭时eof
connection.close()
except Exception as e:
error_queue.put(e)
# Log completion
process1_logger.info("Finished process 1")
# Process2 logic
def process2(error_queue):
process2_logger = logging.getLogger('process2')
process2_logger.info(f"Pid:{os.getpid()}")
# Keep attempting to open the fifo, ignore race condition failures
while True:
try:
file = os.open(fifo, os.O_RDONLY | os.O_NONBLOCK)
#fcntl.fcntl(file, fcntl.F_SETFL , os.O_NONBLOCK)
file = os.open(fifo, os.O_RDONLY)
break
except:
pass
while True:
try:
report = os.read(file,1024)
if report != b'':
fd = open('/dev/hidg0','rb+')
process2_logger.info(f"Writing {report}")
fd.write(report)
fd.write(zero.encode())
fd.flush()
fd.close()
except Exception as e:
error_queue.put(e)
os.close(file)
# Log completion
process2_logger.info("Finished process 2")
pass
# Main
def main():
if not os.path.exists(fifo):
# Create a fifo, os.mkfifo will block until there is a reader (process2)
os.mkfifo(fifo)
# Setup parent logger and log pid
parent_logger = logging.getLogger('parent')
parent_logger.info(f"Pid:{os.getpid()}")
error_queue = Queue(0)
# Setup processes
procs = [Process(target=process1,args=(error_queue,)), Process(target=process2,args=(error_queue,))]
# Start processes
for proc in procs:
proc.start()
while True:
if not error_queue.empty():
error_flag = error_queue.get()
if error_flag != None:
print(error_flag)
for proc in procs:
proc.terminate()
proc.join()#终止所有子进程
os.unlink(fifo)
sys.exit(1)#终止主进程
# Execute main
if __name__ == '__main__':
main()