/ tests / halmodule.1 / stream_test.py
stream_test.py
 1  #!/usr/bin/env python
 2  import os
 3  os.system("halcmd unload all")
 4  import hal
 5  c = hal.component("stream_test")
 6  writer = hal.stream(c, hal.streamer_base, 10, "bfsu")
 7  c.ready()
 8  
 9  for i in range(9):
10      assert writer.writable
11      writer.write((i % 2, i, i, i))
12  assert not writer.writable
13  assert writer.num_overruns == 0
14  try:
15      writer.write((1,1,1,1))
16  except:
17      pass
18  else:
19      assert False, "failed to get exception on full stream"
20  assert writer.num_overruns == 1
21  
22  # In rtai realtime, it's only permitted to map a shared memory region more
23  # once in a free-running component, so the writer and reader of a stream can't
24  # exist at the same time in the same process
25  del writer
26  
27  reader = hal.stream(c, hal.streamer_base, "bfsu")
28  for i in range(9):
29      assert reader.readable
30      assert reader.read() == ((i % 2, i, i, i))
31      assert reader.num_underruns == 0
32      assert reader.sampleno == i+1
33  assert reader.read() is None
34  assert reader.num_underruns == 1
35  
36  print "pass"