qt_action.py
1 import linuxcnc 2 import hal 3 4 # Set up logging 5 import logger 6 log = logger.getLogger(__name__) 7 # log.setLevel(logger.INFO) # One of DEBUG, INFO, WARNING, ERROR, CRITICAL 8 9 from qtvcp.core import Status, Info 10 INFO = Info() 11 STATUS = Status() 12 13 ################################################################ 14 # Action class 15 ################################################################ 16 class _Lcnc_Action(object): 17 def __init__(self): 18 # only initialize once for all instances 19 if self.__class__._instanceNum >=1: 20 return 21 self.__class__._instanceNum += 1 22 self.cmd = linuxcnc.command() 23 self.tmp = None 24 self.prefilter_path = None 25 self.home_all_warning_flag = False 26 27 def SET_ESTOP_STATE(self, state): 28 if state: 29 self.cmd.state(linuxcnc.STATE_ESTOP) 30 else: 31 self.cmd.state(linuxcnc.STATE_ESTOP_RESET) 32 33 def SET_MACHINE_STATE(self, state): 34 if state: 35 self.cmd.state(linuxcnc.STATE_ON) 36 else: 37 self.cmd.state(linuxcnc.STATE_OFF) 38 39 def SET_MACHINE_HOMING(self, joint): 40 self.ensure_mode(linuxcnc.MODE_MANUAL) 41 self.cmd.teleop_enable(False) 42 if not INFO.HOME_ALL_FLAG and joint == -1: 43 if not self.home_all_warning_flag == True: 44 self.home_all_warning_flag = True 45 STATUS.emit('error',linuxcnc.NML_ERROR, 46 ''''Home-all not available according to INI Joint Home sequence 47 Set the joint sequence in the INI or 48 modify the screen for individual home buttons 49 to avoid this warning 50 Press again to home Z axis Joint''') 51 else: 52 if STATUS.is_all_homed(): 53 self.home_all_warning_flag = False 54 return 55 # so linuxcnc is misonfigured or the Screen is built wrong (needs individual home buttons) 56 # now we will fake individual home buttons by homing joints one at a time 57 # but always start will Z - on a mill it's safer 58 zj = INFO.GET_JOG_FROM_NAME['Z'] 59 if not STATUS.stat.homed[zj]: 60 log.info('Homing Joint: {}'.format(zj)) 61 self.cmd.home(zj) 62 STATUS.emit('error',linuxcnc.NML_ERROR, 63 ''''Home-all not available according to INI Joint Home sequence 64 Press again to home next Joint''') 65 return 66 length = len(INFO.JOINTSEQUENCELIST) 67 for num,j in enumerate(INFO.JOINTSEQUENCELIST): 68 print j, num, len(INFO.JOINTSEQUENCELIST) 69 # at the end so all homed 70 if num == length -1: 71 self.home_all_warning_flag = False 72 # one from end but end is already homed 73 if num == length -2 and STATUS.stat.homed[zj]: 74 self.home_all_warning_flag = False 75 # Z joint is homed first outside this for loop 76 if j == zj : continue 77 # ok home it then stop and wait for next button push 78 if not STATUS.stat.homed[j]: 79 log.info('Homing Joint: {}'.format(j)) 80 self.cmd.home(j) 81 if self.home_all_warning_flag: 82 STATUS.emit('error',linuxcnc.NML_ERROR, 83 ''''Home-all not available according to INI Joint Home sequence 84 Press again to home next Joint''') 85 break 86 else: 87 log.info('Homing Joint: {}'.format(joint)) 88 self.cmd.home(joint) 89 90 def SET_MACHINE_UNHOMED(self, joint): 91 self.ensure_mode(linuxcnc.MODE_MANUAL) 92 self.cmd.teleop_enable(False) 93 #self.cmd.traj_mode(linuxcnc.TRAJ_MODE_FREE) 94 self.cmd.unhome(joint) 95 96 def SET_AUTO_MODE(self): 97 self.ensure_mode(linuxcnc.MODE_AUTO) 98 99 # if called while on hard limit will set the flag and allow machine on 100 # if called with flag set and now off hard limits - resets the flag 101 def TOGGLE_LIMITS_OVERRIDE(self): 102 if STATUS.is_limits_override_set() and STATUS.is_hard_limits_tripped(): 103 STATUS.emit('error',linuxcnc.OPERATOR_ERROR,'''Can Not Reset Limits Override - Still On Hard Limits''') 104 elif not STATUS.is_limits_override_set() and STATUS.is_hard_limits_tripped(): 105 STATUS.emit('error',linuxcnc.OPERATOR_ERROR,'Hard Limits Are Overridden!') 106 self.cmd.override_limits() 107 else: 108 STATUS.emit('error',linuxcnc.OPERATOR_TEXT,'Hard Limits Are Reset To Active!') 109 self.cmd.override_limits() 110 111 def SET_MDI_MODE(self): 112 self.ensure_mode(linuxcnc.MODE_MDI) 113 114 def SET_MANUAL_MODE(self): 115 self.ensure_mode(linuxcnc.MODE_MANUAL) 116 117 def CALL_MDI(self, code): 118 self.ensure_mode(linuxcnc.MODE_MDI) 119 self.cmd.mdi('%s'%code) 120 121 def CALL_MDI_WAIT(self, code, time = 5): 122 log.debug('MDI_WAIT_COMMAND= {}, maxt = {}'.format(code, time)) 123 self.ensure_mode(linuxcnc.MODE_MDI) 124 for l in code.split("\n"): 125 log.debug('MDI_COMMAND: {}'.format(l)) 126 self.cmd.mdi( l ) 127 result = self.cmd.wait_complete(time) 128 if result == -1: 129 log.debug('MDI_COMMAND_WAIT timeout past {} sec. Error: {}'.format( time, result)) 130 #STATUS.emit('MDI time out error',) 131 self.ABORT() 132 return -1 133 elif result == linuxcnc.RCS_ERROR: 134 log.debug('MDI_COMMAND_WAIT RCS error: {}'.format( time, result)) 135 #STATUS.emit('MDI time out error',) 136 return -1 137 result = linuxcnc.error_channel().poll() 138 if result: 139 STATUS.emit('error',result[0],result[1]) 140 log.error('MDI_COMMAND_WAIT Error channel: {}'.format(result[1])) 141 return -1 142 return 0 143 144 def CALL_INI_MDI(self, number): 145 try: 146 mdi = INFO.MDI_COMMAND_LIST[number] 147 except: 148 log.error('MDI_COMMAND= # {} Not found under [MDI_COMMAND_LIST] in INI file'.format(number)) 149 return 150 mdi_list = mdi.split(';') 151 self.ensure_mode(linuxcnc.MODE_MDI) 152 for code in(mdi_list): 153 self.cmd.mdi('%s'% code) 154 155 def CALL_OWORD(self, code, time=5 ): 156 log.debug('OWORD_COMMAND= {}'.format(code)) 157 self.ensure_mode(linuxcnc.MODE_MDI) 158 self.cmd.mdi(code) 159 STATUS.stat.poll() 160 while STATUS.stat.exec_state == linuxcnc.EXEC_WAITING_FOR_MOTION_AND_IO or \ 161 STATUS.stat.exec_state == linuxcnc.EXEC_WAITING_FOR_MOTION: 162 result = self.cmd.wait_complete(time) 163 if result == -1: 164 log.error('Oword timeout oast () Error = # {}'.format(time, result)) 165 self.ABORT() 166 return -1 167 elif result == linuxcnc.RCS_ERROR: 168 log.error('Oword RCS Error = # {}'.format(result)) 169 return -1 170 result = linuxcnc.error_channel().poll() 171 if result: 172 STATUS.emit('error',result[0],result[1]) 173 log.error('Oword Error: {}'.format(result[1])) 174 return -1 175 STATUS.stat.poll() 176 result = self.cmd.wait_complete(time) 177 if result == -1 or result == linuxcnc.RCS_ERROR or linuxcnc.error_channel().poll(): 178 log.error('Oword RCS Error = # {}'.format(result)) 179 return -1 180 result = linuxcnc.error_channel().poll() 181 if result: 182 STATUS.emit('error',result[0],result[1]) 183 log.error('Oword Error: {}'.format(result[1])) 184 return -1 185 log.debug('OWORD_COMMAND returns complete : {}'.format(result)) 186 return 0 187 188 def UPDATE_VAR_FILE(self): 189 self.ensure_mode(linuxcnc.MODE_MANUAL) 190 self.ensure_mode(linuxcnc.MODE_MDI) 191 192 def OPEN_PROGRAM(self, fname): 193 self.prefilter_path = str(fname) 194 self.ensure_mode(linuxcnc.MODE_AUTO) 195 old = STATUS.stat.file 196 flt = INFO.get_filter_program(str(fname)) 197 198 if flt: 199 log.debug('get {} filtered program {}'.format(flt,fname)) 200 self.open_filter_program(str(fname), flt) 201 else: 202 log.debug( 'Load program {}'.format(fname)) 203 self.cmd.program_open(str(fname)) 204 205 # STATUS can't tell if we are loading the same file. 206 # for instance if you edit a file then load it again. 207 # so we check for it and force an update: 208 if old == fname: 209 STATUS.emit('file-loaded',fname) 210 211 def SAVE_PROGRAM(self, source, fname): 212 if source == '': return 213 if '.' not in fname: 214 fname += '.ngc' 215 name, ext = fname.rsplit('.') 216 try: 217 outfile = open(name + '.' + ext.lower(),'w') 218 outfile.write(source) 219 STATUS.emit('update-machine-log', 'Saved: ' + fname, 'TIME') 220 except Exception as e: 221 print e 222 finally: 223 outfile.close() 224 225 def SET_AXIS_ORIGIN(self,axis,value): 226 m = "G10 L20 P0 %s%f"%(axis,value) 227 fail, premode = self.ensure_mode(linuxcnc.MODE_MDI) 228 self.cmd.mdi(m) 229 self.cmd.wait_complete() 230 self.ensure_mode(premode) 231 self.RELOAD_DISPLAY() 232 233 # Adjust tool offsets so current position ends up the given value 234 def SET_TOOL_OFFSET(self,axis,value,fixture = False): 235 lnum = 10+int(fixture) 236 m = "G10 L%d P%d %s%f"%(lnum, STATUS.stat.tool_in_spindle, axis, value) 237 fail, premode = self.ensure_mode(linuxcnc.MODE_MDI) 238 self.cmd.mdi(m) 239 self.cmd.wait_complete() 240 self.cmd.mdi("G43") 241 self.cmd.wait_complete() 242 self.ensure_mode(premode) 243 self.RELOAD_DISPLAY() 244 245 # Set actual tool offset in tool table to the given value 246 def SET_DIRECT_TOOL_OFFSET(self,axis,value): 247 m = "G10 L1 P%d %s%f"%( STATUS.get_current_tool(), axis, value) 248 fail, premode = self.ensure_mode(linuxcnc.MODE_MDI) 249 self.cmd.mdi(m) 250 self.cmd.wait_complete() 251 self.cmd.mdi("G43") 252 self.cmd.wait_complete() 253 self.ensure_mode(premode) 254 self.RELOAD_DISPLAY() 255 256 def RUN(self, line=0): 257 if not STATUS.is_auto_mode(): 258 self.ensure_mode(linuxcnc.MODE_AUTO) 259 if STATUS.is_auto_paused() and line == 0: 260 self.cmd.auto(linuxcnc.AUTO_STEP) 261 return 262 elif not STATUS.is_auto_running(): 263 self.cmd.auto(linuxcnc.AUTO_RUN,line) 264 265 def STEP(self): 266 if STATUS.is_auto_running() and not STATUS.is_auto_paused(): 267 self.cmd.auto(linuxcnc.AUTO_PAUSE) 268 return 269 if STATUS.is_auto_paused(): 270 self.cmd.auto(linuxcnc.AUTO_STEP) 271 return 272 273 def ABORT(self): 274 self.ensure_mode(linuxcnc.MODE_AUTO) 275 self.cmd.abort() 276 277 def PAUSE(self): 278 if not STATUS.stat.paused: 279 self.cmd.auto(linuxcnc.AUTO_PAUSE) 280 else: 281 log.debug('resume') 282 self.cmd.auto(linuxcnc.AUTO_RESUME) 283 284 def SET_MAX_VELOCITY_RATE(self, rate): 285 self.cmd.maxvel(rate/60.0) 286 def SET_RAPID_RATE(self, rate): 287 self.cmd.rapidrate(rate/100.0) 288 def SET_FEED_RATE(self, rate): 289 self.cmd.feedrate(rate/100.0) 290 def SET_SPINDLE_RATE(self, rate, number = 0): 291 self.cmd.spindleoverride(rate/100.0, number) 292 def SET_JOG_RATE(self, rate): 293 STATUS.set_jograte(float(rate)) 294 def SET_JOG_RATE_ANGULAR(self, rate): 295 STATUS.set_jograte_angular(float(rate)) 296 def SET_JOG_INCR(self, incr, text): 297 STATUS.set_jog_increments(incr, text) 298 def SET_JOG_INCR_ANGULAR(self, incr, text): 299 STATUS.set_jog_increment_angular(incr, text) 300 301 def SET_SPINDLE_ROTATION(self, direction = 1, rpm = 100, number = 0): 302 self.cmd.spindle(direction, rpm, number) 303 def SET_SPINDLE_FASTER(self, number = 0): 304 self.cmd.spindle(linuxcnc.SPINDLE_INCREASE, number) 305 def SET_SPINDLE_SLOWER(self, number = 0): 306 self.cmd.spindle(linuxcnc.SPINDLE_DECREASE, number) 307 def SET_SPINDLE_STOP(self, number = 0): 308 self.cmd.spindle(linuxcnc.SPINDLE_OFF, number) 309 310 def SET_USER_SYSTEM(self, system): 311 systemnum = str(system).strip('gG') 312 if systemnum in('54', '55', '56', '57', '58', '59', '59.1', '59.2', '59.3'): 313 fail, premode = self.ensure_mode(linuxcnc.MODE_MDI) 314 self.cmd.mdi('G'+systemnum) 315 self.cmd.wait_complete() 316 self.ensure_mode(premode) 317 318 def ZERO_G92_OFFSET(self): 319 self.CALL_MDI("G92.1") 320 self.RELOAD_DISPLAY() 321 def ZERO_ROTATIONAL_OFFSET(self): 322 self.CALL_MDI("G10 L2 P0 R 0") 323 self.RELOAD_DISPLAY() 324 def ZERO_G5X_OFFSET(self, num): 325 fail, premode = self.ensure_mode(linuxcnc.MODE_MDI) 326 clear_command = "G10 L2 P%d R0" % num 327 for a in INFO.AVAILABLE_AXES: 328 clear_command += " %c0" % a 329 self.cmd.mdi('%s'% clear_command) 330 self.cmd.wait_complete() 331 self.ensure_mode(premode) 332 self.RELOAD_DISPLAY() 333 334 def RECORD_CURRENT_MODE(self): 335 mode = STATUS.get_current_mode() 336 self.last_mode = mode 337 return mode 338 339 def RESTORE_RECORDED_MODE(self): 340 self.ensure_mode(self.last_mode) 341 342 def SET_SELECTED_JOINT(self, data): 343 if isinstance(data, (int, long)): 344 STATUS.set_selected_joint(data) 345 else: 346 log.error( 'Selected joint must be an integer: {}'.format(data)) 347 348 def SET_SELECTED_AXIS(self, data): 349 if isinstance(data, (str)): 350 STATUS.set_selected_axis(data) 351 else: 352 log.error( 'Selected axis must be a string: {}'.format(data)) 353 354 # jog based on STATUS's rate and distance 355 # use joint number for joint or letter for axis jogging 356 def DO_JOG(self, joint_axis, direction): 357 angular = False 358 if isinstance(joint_axis, (int, long)): 359 if STATUS.stat.joint[joint_axis]['jointType'] == linuxcnc.ANGULAR: 360 angular = True 361 jointnum = joint_axis 362 else: 363 if joint_axis.upper() in('A','B','C'): 364 angular = True 365 s ='XYZABCUVW' 366 jointnum = s.find(joint_axis) 367 # Get jog rate 368 if angular: 369 distance = STATUS.get_jog_increment_angular() 370 rate = STATUS.get_jograte_angular()/60 371 else: 372 distance = STATUS.get_jog_increment() 373 rate = STATUS.get_jograte()/60 374 self.JOG(jointnum, direction, rate, distance) 375 376 # jog based on given variables 377 # checks for jog joint mode first 378 def JOG(self, jointnum, direction, rate, distance=0): 379 jjogmode,j_or_a = self.get_jog_info(jointnum) 380 if jjogmode is None or j_or_a is None: 381 return 382 if direction == 0: 383 self.cmd.jog(linuxcnc.JOG_STOP, jjogmode, j_or_a) 384 else: 385 if distance == 0: 386 self.cmd.jog(linuxcnc.JOG_CONTINUOUS, jjogmode, j_or_a, direction * rate) 387 else: 388 self.cmd.jog(linuxcnc.JOG_INCREMENT, jjogmode, j_or_a, direction * rate, distance) 389 390 def TOGGLE_FLOOD(self): 391 self.cmd.flood(not(STATUS.stat.flood)) 392 def SET_FLOOD_ON(self): 393 self.cmd.flood(1) 394 def SET_FLOOD_OFF(self): 395 self.cmd.flood(0) 396 397 def TOGGLE_MIST(self): 398 self.cmd.mist(not(STATUS.stat.mist)) 399 def SET_MIST_ON(self): 400 self.cmd.mist(1) 401 def SET_MIST_OFF(self): 402 self.cmd.mist(0) 403 404 def RELOAD_TOOLTABLE(self): 405 self.cmd.load_tool_table() 406 407 def TOGGLE_OPTIONAL_STOP(self): 408 self.cmd.set_optional_stop(not(STATUS.stat.optional_stop)) 409 def SET_OPTIONAL_STOP_ON(self): 410 self.cmd.set_optional_stop(True) 411 def SET_OPTIONAL_STOP_OFF(self): 412 self.cmd.set_optional_stop(False) 413 414 def TOGGLE_BLOCK_DELETE(self): 415 self.cmd.set_block_delete(not(STATUS.stat.block_delete)) 416 def SET_BLOCK_DELETE_ON(self): 417 self.cmd.set_block_delete(True) 418 def SET_BLOCK_DELETE_OFF(self): 419 self.cmd.set_block_delete(False) 420 421 def RELOAD_DISPLAY(self): 422 STATUS.emit('reload-display') 423 424 def SET_GRAPHICS_VIEW(self, view): 425 if view.lower() in('x', 'y', 'y2', 'z', 'z2', 'p', 'clear', 426 'zoom-in','zoom-out','pan-up','pan-down', 427 'pan-left','pan-right','rotate-up', 428 'rotate-down', 'rotate-cw','rotate-ccw'): 429 STATUS.emit('view-changed',view) 430 431 def SHUT_SYSTEM_DOWN_PROMPT(self): 432 import subprocess 433 try: 434 try: 435 subprocess.call('gnome-session-quit --power-off',shell=True) 436 except: 437 try: 438 subprocess.call('xfce4-session-logout', shell=True) 439 except: 440 try: 441 subprocess.call('systemctl poweroff', shell=True) 442 except: 443 raise 444 except Exception as e: 445 log.warning("Couldn't shut system down: {}".format(e)) 446 447 def SHUT_SYSTEM_DOWN_NOW(self): 448 import subprocess 449 subprocess.call('shutdown now') 450 451 ###################################### 452 # Action Helper functions 453 ###################################### 454 455 # In free (joint) mode we use the plain joint number. 456 # In axis mode we convert the joint number to the equivalent 457 # axis number 458 def get_jog_info (self,num): 459 if STATUS.stat.motion_mode == linuxcnc.TRAJ_MODE_FREE: 460 return True, self.jnum_check(num) 461 return False, num 462 463 def jnum_check(self,num): 464 if STATUS.stat.kinematics_type != linuxcnc.KINEMATICS_IDENTITY: 465 log.warning("Joint jogging not supported for non-identity kinematics") 466 #return None 467 if num > INFO.JOINT_COUNT: 468 log.error("Computed joint number={} exceeds jointcount={}".format(num,INFO.JOINT_COUNT)) 469 # decline to jog 470 return None 471 if num not in INFO.AVAILABLE_JOINTS: 472 log.warning("Joint {} is not in available joints {}".format(num, INFO.AVAILABLE_JOINTS)) 473 return None 474 return num 475 476 def ensure_mode(self, *modes): 477 truth, premode = STATUS.check_for_modes(modes) 478 if truth is False: 479 self.cmd.mode(modes[0]) 480 self.cmd.wait_complete() 481 return (True, premode) 482 else: 483 return (truth, premode) 484 485 def open_filter_program(self,fname, flt): 486 log.debug('Opening filtering program yellow<{}> for {}'.format(flt,fname)) 487 if not self.tmp: 488 self._mktemp() 489 tmp = os.path.join(self.tmp, os.path.basename(fname)) 490 flt = FilterProgram(flt, fname, tmp, lambda r: r or self._load_filter_result(tmp)) 491 492 def _load_filter_result(self, fname): 493 old = STATUS.stat.file 494 log.debug( 'Load filtered program {}'.format(fname)) 495 self.cmd.program_open(str(fname)) 496 497 # STATUS can't tell if we are loading the same file. 498 # for instance if you edit a file then load it again. 499 # so we check for it and force an update: 500 if old == fname: 501 STATUS.emit('file-loaded',fname) 502 503 def _mktemp(self): 504 if self.tmp: 505 return 506 self.tmp = tempfile.mkdtemp(prefix='emcflt-', suffix='.d') 507 atexit.register(lambda: shutil.rmtree(self.tmp)) 508 509 ############################# 510 ########################################### 511 # Filter Class 512 ######################################################################## 513 import os, sys, time, select, re 514 import tempfile, atexit, shutil 515 516 # slightly reworked code from gladevcp 517 # loads a filter program and collects the result 518 progress_re = re.compile("^FILTER_PROGRESS=(\\d*)$") 519 class FilterProgram: 520 def __init__(self, program_filter, infilename, outfilename, callback=None): 521 import subprocess 522 outfile = open(outfilename, "w") 523 infilename_q = infilename.replace("'", "'\\''") 524 env = dict(os.environ) 525 env['AXIS_PROGRESS_BAR'] = '1' 526 p = subprocess.Popen(["sh", "-c", "%s '%s'" % (program_filter, infilename_q)], 527 stdin=subprocess.PIPE, 528 stdout=outfile, 529 stderr=subprocess.PIPE, 530 env=env) 531 p.stdin.close() # No input for you 532 self.p = p 533 self.stderr_text = [] 534 self.program_filter = program_filter 535 self.callback = callback 536 self.gid = STATUS.connect('periodic', self.update) 537 #progress = Progress(1, 100) 538 #progress.set_text(_("Filtering...")) 539 540 def update(self, w): 541 if self.p.poll() is not None: 542 self.finish() 543 STATUS.disconnect(self.gid) 544 return False 545 546 r,w,x = select.select([self.p.stderr], [], [], 0) 547 if not r: 548 return True 549 stderr_line = self.p.stderr.readline() 550 m = progress_re.match(stderr_line) 551 if m: 552 pass #progress.update(int(m.group(1)), 1) 553 else: 554 self.stderr_text.append(stderr_line) 555 sys.stderr.write(stderr_line) 556 return True 557 558 def finish(self): 559 # .. might be something left on stderr 560 for line in self.p.stderr: 561 m = progress_re.match(line) 562 if not m: 563 self.stderr_text.append(line) 564 sys.stderr.write(line) 565 r = self.p.returncode 566 if r: 567 self.error(r, "".join(self.stderr_text)) 568 if self.callback: 569 self.callback(r) 570 571 # pop a (probable) dialog box 572 def error(self, exitcode, stderr): 573 message = _('The filter program %(program)r exited with an error')% {'program': self.program_filter} 574 more = _("Any error messages it produced are shown below:") 575 mess = {'NAME':'MESSAGE','ID':'ACTION_ERROR__', 576 'MESSAGE':message, 577 'MORE': more, 578 'DETAILS':stderr, 579 'ICON':'CRITICAL', 580 'FOCUS_TEXT': _('Filter program Error'), 581 'TITLE':'Program Filter Error'} 582 STATUS.emit('dialog-request', mess) 583 log.error('Filter Program Error:{}'.format (stderr)) 584 585