/ tests / mdi-queue-length / test-ui.py
test-ui.py
  1  #!/usr/bin/env python
  2  
  3  import linuxcnc
  4  import hal
  5  
  6  import math
  7  import time
  8  import sys
  9  import subprocess
 10  import os
 11  import signal
 12  import glob
 13  import re
 14  
 15  
 16  def wait_for_linuxcnc_startup(status, timeout=10.0):
 17  
 18      """Poll the Status buffer waiting for it to look initialized,
 19      rather than just allocated (all-zero).  Returns on success, throws
 20      RuntimeError on failure."""
 21  
 22      start_time = time.time()
 23      while time.time() - start_time < timeout:
 24          status.poll()
 25          if (status.angular_units == 0.0) \
 26              or (status.axes == 0) \
 27              or (status.axis_mask == 0) \
 28              or (status.cycle_time == 0.0) \
 29              or (status.exec_state != linuxcnc.EXEC_DONE) \
 30              or (status.interp_state != linuxcnc.INTERP_IDLE) \
 31              or (status.inpos == False) \
 32              or (status.linear_units == 0.0) \
 33              or (status.max_acceleration == 0.0) \
 34              or (status.max_velocity == 0.0) \
 35              or (status.program_units == 0.0) \
 36              or (status.rapidrate == 0.0) \
 37              or (status.state != linuxcnc.STATE_ESTOP) \
 38              or (status.task_state != linuxcnc.STATE_ESTOP):
 39              time.sleep(0.1)
 40          else:
 41              # looks good
 42              return
 43  
 44      # timeout, throw an exception
 45      raise RuntimeError
 46  
 47  
 48  def wait_for_mdi_queue(queue_len, timeout=10):
 49      start_time = time.time()
 50      while (time.time() - start_time) < timeout:
 51          s.poll()
 52          if s.queued_mdi_commands == queue_len:
 53              return
 54          time.sleep(0.1)
 55      print "queued_mdi_commands at %d after %.3f seconds" % (s.queued_mdi_commands, timeout)
 56      sys.exit(1)
 57  
 58  
 59  c = linuxcnc.command()
 60  s = linuxcnc.stat()
 61  e = linuxcnc.error_channel()
 62  
 63  
 64  h = hal.component("test-ui")
 65  h.newpin("digital-poker", hal.HAL_BIT, hal.HAL_OUT)
 66  h['digital-poker'] = False
 67  h.ready()
 68  
 69  hal.new_sig('poke', hal.HAL_BIT)
 70  hal.connect('motion.digital-in-00', 'poke')
 71  hal.connect('test-ui.digital-poker', 'poke')
 72  
 73  
 74  # Wait for LinuxCNC to initialize itself so the Status buffer stabilizes.
 75  wait_for_linuxcnc_startup(s)
 76  
 77  c.state(linuxcnc.STATE_ESTOP_RESET)
 78  c.state(linuxcnc.STATE_ON)
 79  c.home(-1)
 80  c.wait_complete()
 81  
 82  c.mode(linuxcnc.MODE_MDI)
 83  
 84  # At startup there's nothing in the queue.
 85  s.poll()
 86  assert(s.queued_mdi_commands == 0)
 87  
 88  # Block Motion from draining the queue, by asking it to wait for us to
 89  # poke a synchronized digital input.  Wait up to 30 seconds for digital
 90  # input 0 to go High.
 91  c.mdi('m66 p0 l3 q30')
 92  
 93  s.poll()
 94  assert(s.queued_mdi_commands == 0)
 95  
 96  # Put an MDI command on Task's MDI queue.
 97  c.mdi('g4 p0')
 98  
 99  s.poll()
100  assert(s.queued_mdi_commands == 1)
101  
102  # Add another MDI command that we can control.
103  # Wait up to 30 seconds for digital input 0 to go Low.
104  c.mdi('m66 p0 l4 q30')
105  
106  s.poll()
107  assert(s.queued_mdi_commands == 2)
108  
109  c.mdi('g4 p0')
110  
111  s.poll()
112  assert(s.queued_mdi_commands == 3)
113  
114  h['digital-poker'] = True
115  wait_for_mdi_queue(queue_len=1, timeout=10)
116  
117  h['digital-poker'] = False
118  wait_for_mdi_queue(queue_len=0, timeout=10)
119  
120  sys.exit(0)
121