stream_test.py
1 #!/usr/bin/env python 2 import os 3 os.system("halcmd unload all") 4 import hal 5 c = hal.component("stream_test") 6 writer = hal.stream(c, hal.streamer_base, 10, "bfsu") 7 c.ready() 8 9 for i in range(9): 10 assert writer.writable 11 writer.write((i % 2, i, i, i)) 12 assert not writer.writable 13 assert writer.num_overruns == 0 14 try: 15 writer.write((1,1,1,1)) 16 except: 17 pass 18 else: 19 assert False, "failed to get exception on full stream" 20 assert writer.num_overruns == 1 21 22 # In rtai realtime, it's only permitted to map a shared memory region more 23 # once in a free-running component, so the writer and reader of a stream can't 24 # exist at the same time in the same process 25 del writer 26 27 reader = hal.stream(c, hal.streamer_base, "bfsu") 28 for i in range(9): 29 assert reader.readable 30 assert reader.read() == ((i % 2, i, i, i)) 31 assert reader.num_underruns == 0 32 assert reader.sampleno == i+1 33 assert reader.read() is None 34 assert reader.num_underruns == 1 35 36 print "pass"