1
0
mirror of https://github.com/xtacocorex/CHIP_IO synced 2025-07-21 13:23:21 +00:00

3 Commits

7 changed files with 155 additions and 64 deletions

View File

@ -1,3 +1,7 @@
0.1.0
----
* Fixed edge detection code, will trigger proper for callbacks now
0.0.9 0.0.9
---- ----
* Fixed SoftPWM segfault * Fixed SoftPWM segfault

View File

@ -20,7 +20,7 @@ classifiers = ['Development Status :: 3 - Alpha',
'Topic :: System :: Hardware'] 'Topic :: System :: Hardware']
setup(name = 'CHIP_IO', setup(name = 'CHIP_IO',
version = '0.0.9', version = '0.1.0',
author = 'Robert Wolterman', author = 'Robert Wolterman',
author_email = 'robert.wolterman@gmail.com', author_email = 'robert.wolterman@gmail.com',
description = 'A module to control CHIP IO channels', description = 'A module to control CHIP IO channels',

View File

@ -76,6 +76,6 @@ void define_constants(PyObject *module)
both_edge = Py_BuildValue("i", BOTH_EDGE); both_edge = Py_BuildValue("i", BOTH_EDGE);
PyModule_AddObject(module, "BOTH", both_edge); PyModule_AddObject(module, "BOTH", both_edge);
version = Py_BuildValue("s", "0.0.9"); version = Py_BuildValue("s", "0.1.0");
PyModule_AddObject(module, "VERSION", version); PyModule_AddObject(module, "VERSION", version);
} }

View File

@ -64,6 +64,7 @@ struct fdx *fd_list = NULL;
struct callback struct callback
{ {
int gpio; int gpio;
int edge;
void (*func)(int gpio); void (*func)(int gpio);
struct callback *next; struct callback *next;
}; };
@ -446,6 +447,74 @@ int gpio_set_edge(int gpio, unsigned int edge)
return 0; return 0;
} }
int open_edge_file(int gpio)
{
int fd;
char filename[MAX_FILENAME];
// create file descriptor of value file
snprintf(filename, sizeof(filename), "/sys/class/gpio/gpio%d/edge", gpio); BUF2SMALL(filename);
if ((fd = open(filename, O_RDONLY | O_NONBLOCK)) < 0) {
char err[256];
snprintf(err, sizeof(err), "open_edge_file: could not open '%s' (%s)", filename, strerror(errno));
add_error_msg(err);
return -1;
}
return fd;
} /* open_edge_file */
int gpio_get_edge(int gpio)
{
int fd;
int rtnedge = -1;
if ((fd = open_edge_file(gpio)) == -1) {
char err[256];
snprintf(err, sizeof(err), "gpio_get_value: could not open GPIO %d edge file", gpio);
add_error_msg(err);
return -1;
}
if (lseek(fd, 0, SEEK_SET) < 0) {
char err[256];
snprintf(err, sizeof(err), "gpio_get_value: could not seek GPIO %d (%s)", gpio, strerror(errno));
add_error_msg(err);
return -1;
}
char edge[16] = { 0 }; /* make sure read is null-terminated */
ssize_t s = read(fd, &edge, sizeof(edge) - 1);
close(fd);
while (s > 0 && edge[s-1] == '\n') { /* strip trailing newlines */
edge[s-1] = '\0';
s --;
}
if (s < 0) {
char err[256];
snprintf(err, sizeof(err), "gpio_get_value: could not read GPIO %d (%s)", gpio, strerror(errno));
add_error_msg(err);
return -1;
}
if (strcmp(edge, "rising") == 0)
{
rtnedge = 1;
}
else if (strcmp(edge, "falling") == 0)
{
rtnedge = 2;
}
else if (strcmp(edge, "both") == 0)
{
rtnedge = 3;
}
return rtnedge;
}
int gpio_lookup(int fd) int gpio_lookup(int fd)
{ {
struct fdx *f = fd_list; struct fdx *f = fd_list;
@ -466,7 +535,7 @@ void exports_cleanup(void)
gpio_unexport(exported_gpios->gpio); gpio_unexport(exported_gpios->gpio);
} }
int add_edge_callback(int gpio, void (*func)(int gpio)) int add_edge_callback(int gpio, int edge, void (*func)(int gpio))
{ {
struct callback *cb = callbacks; struct callback *cb = callbacks;
struct callback *new_cb; struct callback *new_cb;
@ -474,6 +543,7 @@ int add_edge_callback(int gpio, void (*func)(int gpio))
new_cb = malloc(sizeof(struct callback)); ASSRT(new_cb != NULL); new_cb = malloc(sizeof(struct callback)); ASSRT(new_cb != NULL);
new_cb->gpio = gpio; new_cb->gpio = gpio;
new_cb->edge = edge;
new_cb->func = func; new_cb->func = func;
new_cb->next = NULL; new_cb->next = NULL;
@ -495,7 +565,32 @@ void run_callbacks(int gpio)
while (cb != NULL) while (cb != NULL)
{ {
if (cb->gpio == gpio) if (cb->gpio == gpio)
cb->func(cb->gpio); {
int canrun = 0;
unsigned int value = 0;
gpio_get_value(gpio, &value);
// Both Edge
if (cb->edge == 3)
{
canrun = 1;
}
// Rising Edge
else if ((cb->edge == 1) && (value == 1))
{
canrun = 1;
}
// Falling Edge
else if ((cb->edge == 2) && (value == 0))
{
canrun = 1;
}
// Only run if we are allowed
if (canrun)
{
cb->func(cb->gpio);
}
}
cb = cb->next; cb = cb->next;
} }
} }

View File

@ -62,9 +62,14 @@ int gpio_get_direction(int gpio, unsigned int *value);
int gpio_set_value(int gpio, unsigned int value); int gpio_set_value(int gpio, unsigned int value);
int gpio_get_value(int gpio, unsigned int *value); int gpio_get_value(int gpio, unsigned int *value);
int gpio_set_edge(int gpio, unsigned int edge);
int open_edge_file(int gpio);
int gpio_get_edge(int gpio);
int add_edge_detect(int gpio, unsigned int edge); int add_edge_detect(int gpio, unsigned int edge);
void remove_edge_detect(int gpio); void remove_edge_detect(int gpio);
int add_edge_callback(int gpio, void (*func)(int gpio)); //int add_edge_callback(int gpio, void (*func)(int gpio));
int add_edge_callback(int gpio, int edge, void (*func)(int gpio));
int event_detected(int gpio); int event_detected(int gpio);
int gpio_event_add(int gpio); int gpio_event_add(int gpio);
int gpio_event_remove(int gpio); int gpio_event_remove(int gpio);

View File

@ -264,7 +264,7 @@ static void run_py_callbacks(int gpio)
} }
} }
static int add_py_callback(char *channel, int gpio, unsigned int bouncetime, PyObject *cb_func) static int add_py_callback(char *channel, int gpio, int edge, unsigned int bouncetime, PyObject *cb_func)
{ {
struct py_callback *new_py_cb; struct py_callback *new_py_cb;
struct py_callback *cb = py_callbacks; struct py_callback *cb = py_callbacks;
@ -294,7 +294,7 @@ static int add_py_callback(char *channel, int gpio, unsigned int bouncetime, PyO
cb = cb->next; cb = cb->next;
cb->next = new_py_cb; cb->next = new_py_cb;
} }
add_edge_callback(gpio, run_py_callbacks); add_edge_callback(gpio, edge, run_py_callbacks);
return 0; return 0;
} }
@ -343,7 +343,8 @@ static PyObject *py_add_event_callback(PyObject *self, PyObject *args, PyObject
PyErr_SetString(PyExc_RuntimeError, "Add event detection using add_event_detect first before adding a callback"); PyErr_SetString(PyExc_RuntimeError, "Add event detection using add_event_detect first before adding a callback");
return NULL; return NULL;
} }
if (add_py_callback(channel, gpio, bouncetime, cb_func) != 0) // Defaulting to Falling edge
if (add_py_callback(channel, gpio, 2, bouncetime, cb_func) != 0)
return NULL; return NULL;
Py_RETURN_NONE; Py_RETURN_NONE;
@ -410,7 +411,7 @@ static PyObject *py_add_event_detect(PyObject *self, PyObject *args, PyObject *k
} }
if (cb_func != NULL) if (cb_func != NULL)
if (add_py_callback(channel, gpio, bouncetime, cb_func) != 0) if (add_py_callback(channel, gpio, edge, bouncetime, cb_func) != 0)
return NULL; return NULL;
Py_RETURN_NONE; Py_RETURN_NONE;

View File

@ -4,13 +4,8 @@ import CHIP_IO.GPIO as GPIO
import time import time
import threading import threading
DO_APEINT1_TEST = False
DO_APEINT3_TEST = False
DO_XIOP2_TEST = True
num_callbacks = 0 num_callbacks = 0
def myfuncallback(channel): def myfuncallback(channel):
global num_callbacks global num_callbacks
num_callbacks += 1 num_callbacks += 1
@ -19,10 +14,9 @@ def myfuncallback(channel):
loopfunction_exit = False loopfunction_exit = False
def loopfunction(): def loopfunction():
print("LOOP FUNCTION") print("LOOP FUNCTION START")
for i in xrange(4): for i in xrange(6):
if loopfunction_exit: if loopfunction_exit:
break break
if i % 2: if i % 2:
@ -33,12 +27,16 @@ def loopfunction():
mystr = "SETTING CSID0 HIGH (i=%d)" % i mystr = "SETTING CSID0 HIGH (i=%d)" % i
print(mystr) print(mystr)
GPIO.output("CSID0", GPIO.HIGH) GPIO.output("CSID0", GPIO.HIGH)
print("SLEEPING") print(" LOOP FUNCTION SLEEPING")
time.sleep(1) time.sleep(1)
num_errs = 0 num_errs = 0
print("RUNNING GPIO SELF TEST") print("GETTING CHIP_IO VERSION")
mystr = "CHIP_IO VERSION: %s" % GPIO.VERSION
print(mystr)
print("\nRUNNING GPIO SELF TEST")
GPIO.selftest(0) GPIO.selftest(0)
print("\nVERIFYING SIMPLE FUNCTIONALITY") print("\nVERIFYING SIMPLE FUNCTIONALITY")
@ -73,70 +71,58 @@ GPIO.output("CSID0", GPIO.LOW)
print "LOW", GPIO.input("GPIO1") print "LOW", GPIO.input("GPIO1")
assert(GPIO.input("GPIO1") == GPIO.LOW) assert(GPIO.input("GPIO1") == GPIO.LOW)
print("SWAP GPIO WIRES FOR EDGE DETECTION TESTS AS NEEDED")
raw_input("PRESS ENTER WHEN READY TO START EDGE DETECTION TESTS")
# ============================================== # ==============================================
# EDGE DETECTION - AP-EINT1 # EDGE DETECTION - AP-EINT1
if DO_APEINT1_TEST: print("\nSETTING UP RISING EDGE DETECTION ON AP-EINT1")
print("\nSETTING UP RISING EDGE DETECTION ON AP-EINT1") GPIO.setup("AP-EINT1", GPIO.IN)
GPIO.setup("AP-EINT1", GPIO.IN) GPIO.add_event_detect("AP-EINT1", GPIO.RISING, myfuncallback)
GPIO.add_event_detect("AP-EINT1", GPIO.RISING) print("VERIFYING EDGE DETECT WAS SET PROPERLY")
f = open("/sys/class/gpio/gpio193/edge", "r")
print("VERIFYING EDGE DETECT") edge = f.read()
f = open("/sys/class/gpio/gpio193/edge", "r") f.close()
edge = f.read() assert(edge == "rising\n")
f.close() GPIO.remove_event_detect("AP-EINT1")
assert(edge == "rising\n")
GPIO.remove_event_detect("AP-EINT1")
# ============================================== # ==============================================
# EDGE DETECTION - AP-EINT3 # EDGE DETECTION - AP-EINT3
if DO_APEINT3_TEST: print("\nSETTING UP BOTH EDGE DETECTION ON AP-EINT3")
print("\nSETTING UP BOTH EDGE DETECTION ON AP-EINT3") GPIO.setup("AP-EINT3", GPIO.IN)
GPIO.setup("AP-EINT3", GPIO.IN) GPIO.add_event_detect("AP-EINT3", GPIO.BOTH, myfuncallback)
GPIO.add_event_detect("AP-EINT3", GPIO.BOTH) print("VERIFYING EDGE DETECT WAS SET PROPERLY")
f = open("/sys/class/gpio/gpio35/edge", "r")
print("VERIFYING EDGE DETECT") edge = f.read()
f = open("/sys/class/gpio/gpio35/edge", "r") f.close()
edge = f.read() assert(edge == "both\n")
f.close() GPIO.remove_event_detect("AP-EINT3")
assert(edge == "both\n")
GPIO.remove_event_detect("AP-EINT3")
# ============================================== # ==============================================
# EDGE DETECTION - EXPANDED GPIO # EDGE DETECTION - EXPANDED GPIO
if DO_XIOP2_TEST: print("\nSETTING UP FALLING EDGE DETECTION ON XIO-P2")
print("\nSETTING UP FALLING EDGE DETECTION ON XIO-P2") GPIO.add_event_detect("XIO-P2", GPIO.FALLING, myfuncallback)
GPIO.add_event_detect("XIO-P2", GPIO.FALLING, myfuncallback)
print("VERIFYING EDGE DETECT") print("VERIFYING EDGE DETECT")
base = GPIO.get_gpio_base() base = GPIO.get_gpio_base()
gfile = "/sys/class/gpio/gpio%d/edge" % (base + 2) gfile = "/sys/class/gpio/gpio%d/edge" % (base + 2)
f = open(gfile, "r") f = open(gfile, "r")
edge = f.read() edge = f.read()
f.close() f.close()
assert(edge == "falling\n") assert(edge == "falling\n")
# LOOP WRITING ON CSID0 TO HOPEFULLY GET CALLBACK TO WORK # LOOP WRITING ON CSID0 TO HOPEFULLY GET CALLBACK TO WORK
print("WAITING FOR CALLBACKS") print("WAITING FOR CALLBACKS ON XIO-P2")
loopfunction() loopfunction()
mystr = " num_callbacks = %d" % num_callbacks mystr = " num_callbacks = %d" % num_callbacks
print(mystr) print(mystr)
print("PRESS CONTROL-C TO EXIT IF SCRIPT GETS STUCK")
GPIO.remove_event_detect("XIO-P2") GPIO.remove_event_detect("XIO-P2")
print("\nWAIT FOR EDGE TESTING")
print("PRESS CONTROL-C TO EXIT IF SCRIPT GETS STUCK")
try: try:
# WAIT FOR EDGE # WAIT FOR EDGE
t = threading.Thread(target=loopfunction) t = threading.Thread(target=loopfunction)
t.start() t.start()
print("WAITING FOR EDGE") print("WAITING FOR EDGE ON XIO-P2")
if DO_APEINT1_TEST: GPIO.wait_for_edge("XIO-P2", GPIO.FALLING)
GPIO.wait_for_edge("AP-EINT1", GPIO.FALLING)
if DO_APEINT3_TEST:
GPIO.wait_for_edge("AP-EINT3", GPIO.FALLING)
if DO_XIOP2_TEST:
GPIO.wait_for_edge("XIO-P2", GPIO.FALLING)
# THIS SHOULD ONLY PRINT IF WE'VE HIT THE EDGE DETECT # THIS SHOULD ONLY PRINT IF WE'VE HIT THE EDGE DETECT
print("WE'VE FALLEN LIKE COOLIO'S CAREER") print("WE'VE FALLEN LIKE COOLIO'S CAREER")
except: except: