Commit 1cdb1702 authored by anthraxx's avatar anthraxx
Browse files

use own thread to consume from evdev input with fixed event queue size

parent 3e393f09
from evdev import InputDevice, ecodes
from fcntl import fcntl, F_SETFL, F_GETFL
from os import O_NONBLOCK, read
from threading import Thread
from collections import deque
class EvdevScanner(object):
def __init__(self, device):
self.device = InputDevice(device)
self.buffer = ""
self.lines = deque()
self.scancodes = {
2: u'1',
3: u'2',
......@@ -48,21 +51,26 @@ class EvdevScanner(object):
12: u'-'
}
self.device.grab()
self.thread = Thread(target=self.loop, daemon=True)
self.thread.start()
def loop(self):
for event in self.device.read_loop():
if event.type != ecodes.EV_KEY or event.value != 1:
continue
scanned_input = self.scancodes.get(event.code)
if scanned_input == "\n":
line = self.buffer
self.buffer = ""
self.lines.append(line)
continue
self.buffer += scanned_input or ""
def read(self):
try:
for event in self.device.read():
if event.type != ecodes.EV_KEY or event.value != 1:
continue
scanned_input = self.scancodes.get(event.code)
if scanned_input == "\n":
line = self.buffer
self.buffer = ""
return line
self.buffer += scanned_input or ""
return None
except BlockingIOError:
if len(self.lines) <= 0:
return None
line = self.lines.popleft()
return line
class FifoScanner(object):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment