CTC100.py
  1  import re
  2  import serial
  3  import time
  4  
  5  
  6  class CTC100Device:
  7      """
  8      Class to interact with the CTC100 Programmable Temperature Controller.
  9      Adjusted to align with Lake Shore 372 device methods for duck typing,
 10      while retaining all original functionality.
 11      """
 12  
 13      def __init__(self, address, name = None):
 14          try:
 15              self.port = address
 16              self.device = serial.Serial(
 17                  port=address,
 18                  timeout=0
 19              )
 20              time.sleep(1)
 21              self.address = address
 22              device_status = self.read_status()
 23              if not device_status:
 24                  raise Exception("No response from CTC100 device")
 25              self.input_channels = []
 26              self.output_channels = []
 27              self.aio_channels = []
 28              self.list_channels()
 29              self.name = name
 30              print(
 31                  f"Connected to CTC100 on {address} with input channels {self.input_channels}, "
 32                  f"output channels {self.output_channels}, and AIO channels {self.aio_channels}"
 33              )
 34          except Exception as e:
 35              print(f"Error initializing CTC100Device on port {address}: {e}")
 36              raise e  # Re-raise the exception so it can be caught in setup_devices()
 37  
 38      def write(self, command):
 39          """
 40          Send a command to the CTC100 over serial and read the response.
 41  
 42          :param command: Command string to send.
 43          :return: Response from the device.
 44          """
 45          self.device.write((command + "\n").encode())  # \n terminates commands
 46          # Read the response
 47          t1 = time.time()
 48          response = b''
 49          while True:
 50              response += self.device.read(self.device.in_waiting or 1)
 51              if response.endswith(b'\r\n'):
 52                  break
 53              t2 = time.time()
 54              if (t2 - t1) > 0.2:  # Timeout after 1 second
 55                  break
 56          # self.device.reset_input_buffer()
 57          # self.device.reset_output_buffer()
 58          return response
 59  
 60      def get_variable(self, var):
 61          """
 62          Read a parameter from the CTC100.
 63  
 64          :param var: Variable name.
 65          :return: Value of the variable.
 66          """
 67          var = var.replace(" ", "")  # Remove spaces from the variable name
 68          return self.write("{}?".format(var))
 69  
 70      def set_variable(self, var, val):
 71          """
 72          Set a parameter on the CTC100.
 73  
 74          :param var: Variable name.
 75          :param val: Value to set.
 76          :return: Response from the device.
 77          """
 78          var = var.replace(" ", "")
 79          val = "({})".format(val)
 80          return self.write("{} = {}".format(var, val))
 81  
 82      def increment_variable(self, var, val):
 83          """
 84          Increment a parameter on the CTC100.
 85  
 86          :param var: Variable name.
 87          :param val: Value to increment by.
 88          :return: Response from the device.
 89          """
 90          var = var.replace(" ", "")
 91          val = "({})".format(val)
 92          return self.write("{} += {}".format(var, val))
 93  
 94      def setAlarm(self, channel, Tmin, Tmax):
 95          """
 96          Enable and configure an alarm on a channel within specified temperature limits.
 97  
 98          :param channel: Channel number or name.
 99          :param Tmin: Minimum temperature limit.
100          :param Tmax: Maximum temperature limit.
101          """
102          if not isinstance(channel, str):
103              channel = f"In{channel}"
104  
105          self.set_variable(f"{channel}.Alarm.Sound", "4 beeps")
106          self.set_variable(f"{channel}.Alarm.Min", str(Tmin))
107          self.set_variable(f"{channel}.Alarm.Max", str(Tmax))
108          response = self.set_variable(f"{channel}.Alarm.Mode", "Level")
109          return response
110  
111      def disableAlarm(self, channel):
112          """
113          Disable the alarm on a channel.
114  
115          :param channel: Channel number or name.
116          """
117          if not isinstance(channel, str):
118              channel = f"In{channel}"
119  
120          response = self.set_variable(f"{channel}.Alarm.Mode", "Off")
121          return response
122  
123      def get_temperature(self, channel):
124          """
125          Read the temperature from an input or AIO channel.
126          
127          :param channel: Channel number or name.
128          :return: Temperature value.
129          """
130          try:
131              if not isinstance(channel, str):
132                  if channel in self.input_channels:
133                      channel = channel
134                  elif f"{channel}" in self.aio_channels:
135                      channel = f"{channel}"
136                  else:
137                      raise ValueError(f"Channel {channel} not found.")
138              temp = self.read(channel)
139              return temp
140          except Exception as e:
141              print(
142                  f"Error reading temperature from CTC100 (Channel {channel}): {e}")
143              return None
144  
145      def read(self, channel):
146          """
147          Read the value from an input or output channel.
148  
149          :param channel: Channel name (e.g., 'In1').
150          :return: The value read from the channel.
151          """
152          response = self.get_variable(f"{channel}.Value")
153          # Extract the numerical value from the response
154          match = re.search(r"[-+]?\d*\.\d+(?:[eE][-+]?\d+)?",
155                            response.decode("utf-8"))
156          if match is not None:
157              return float(match.group())
158          else:
159              raise RuntimeError(f"Unable to read from channel {channel}")
160  
161      def read_all_channels(self):
162          """
163          Read values from all input and AIO channels.
164  
165          :return: Dictionary with channel names as keys and readings as values.
166          """
167          readings = {}
168          for channel in self.input_channels + self.aio_channels:
169              temp = self.get_temperature(channel)
170              readings[channel] = temp
171          return readings
172  
173      def enable_heater(self):
174          """
175          Enable all heaters (outputs).
176          """
177          self.write("OutputEnable On")
178  
179      def disable_heater(self):
180          """
181          Disable all heaters (outputs).
182          """
183          self.write("OutputEnable Off")
184  
185      def set_heater_output(self, channel, value=0.0):
186          """
187          Set the heater output percentage in Manual mode.
188  
189          :param heater_number: Heater (output) channel number (1 or 2).
190          :param heat_percent: Output percentage to set (0-100%).
191          :return: True if successful, False otherwise.
192          """
193          try:
194              self.write(f'{channel}.Value {value}')
195              return True
196          except Exception as e:
197              print(f"Error setting heater output on CTC100: {e}")
198              return False
199  
200      def set_control_mode(self, channel, mode):
201          """
202          Set the control mode of an output channel.
203  
204          :param channel: Output channel number (1 or 2).
205          :param mode: Control mode ('Off', 'Manual', 'PID').
206          :return: Response from the device.
207          """
208          if mode not in ['Off', 'Manual', 'PID']:
209              raise ValueError(
210                  "Invalid control mode. Must be 'Off', 'Manual', or 'PID'.")
211          return self.set_variable(f"{channel}.Heater.Mode", mode)
212      
213      def set_PID_mode(self, channel, mode):
214          """
215          Set the control mode of an output channel.
216  
217          :param channel: Output channel number (1 or 2).
218          :param mode: Control mode ('Off', 'Manual', 'PID').
219          :return: Response from the device.
220          """
221          if mode not in ['Off', 'On', 'Follow']:
222              raise ValueError(
223                  "Invalid control mode. Must be 'Off', 'On', or 'Follow'.")
224          return self.set_variable(f"{channel}.PID.Mode", mode)
225  
226      def enable_PID(self, channel):
227          """
228          Enable PID control on an output channel.
229  
230          :param channel: Output channel number (1 or 2).
231          """
232          self.set_PID_mode(channel, 'On')
233  
234      def disable_PID(self, channel):
235          """
236          Disable PID control on an output channel.
237  
238          :param channel: Output channel number (1 or 2).
239          """
240          self.set_PID_mode(channel, 'Off')
241  
242      def write_setpoint(self, channel, setpoint):
243          """
244          Set the setpoint value of an output channel.
245  
246          :param channel: Output channel number (1 or 2).
247          :param setpoint: Setpoint value in Kelvin.
248          :return: Response from the device.
249          """
250          return self.set_variable(f"{channel}.PID.Setpoint", setpoint)
251  
252      def read_setpoint(self, channel):
253          """
254          Read the setpoint value of an output channel.
255  
256          :param channel: Output channel number (1 or 2).
257          :return: Setpoint value.
258          """
259          response = self.get_variable(f"Out{channel}.PID.Setpoint")
260          match = re.search(r"[-+]?\d*\.\d+(?:[eE][-+]?\d+)?",
261                            response.decode("utf-8"))
262          if match is not None:
263              return float(match.group())
264          else:
265              raise RuntimeError(f"Unable to read setpoint from Out{channel}")
266  
267      def tune_PID(self, channel, StepY, Lag):
268          """
269          Begin PID auto-tuning on an output channel.
270  
271          :param channel: Output channel number (1 or 2).
272          :param StepY: Heater power to apply during tuning (in Watts).
273          :param Lag: Duration of the tuning step (in seconds).
274          """
275          self.set_variable(f"{channel}.Tune.StepY", StepY)
276          self.set_variable(f"{channel}.Tune.Lag", Lag)
277  
278          # self.enable_heater()
279          self.set_variable(f"{channel}.Tune.Type", "Auto")
280          self.set_variable(f"{channel}.Tune.Mode", "Auto")  # Begin tuning
281  
282          # Sleep during the tuning process
283          time.sleep(Lag + 3*Lag)  # Adding extra time for safety
284  
285          # Check if tuning was successful
286          response = self.get_variable(f"{channel}.PID.Mode").decode()
287          if "On" in response:
288              print("PID tuning was successful! The parameters have been updated.")
289              self.disable_PID(channel)
290          else:
291              print("PID tuning failed! Try higher values for StepY or Lag.")
292  
293      def set_PID_parameters(self, channel, P, I, D):
294          """
295          Set the PID parameters (P, I, D values) on an output channel.
296  
297          :param channel: Output channel number (1 or 2).
298          :param P: Proportional gain.
299          :param I: Integral gain.
300          :param D: Derivative gain.
301          """
302          self.set_variable(f"Out{channel}.PID.P", P)
303          self.set_variable(f"Out{channel}.PID.I", I)
304          self.set_variable(f"Out{channel}.PID.D", D)
305          print(f"PID parameters for Out{channel} set to P={P}, I={I}, D={D}")
306  
307      def read_PID_parameters(self, channel):
308          """
309          Read the PID parameters (P, I, D values) from an output channel.
310  
311          :param channel: Output channel number (1 or 2).
312          :return: Dictionary containing 'P', 'I', 'D' parameters.
313          """
314          params = {}
315          for param in ['P', 'I', 'D']:
316              response = self.get_variable(f"Out{channel}.PID.{param}")
317              match = re.search(
318                  r"[-+]?\d*\.\d+(?:[eE][-+]?\d+)?", response.decode("utf-8"))
319              if match is not None:
320                  params[param] = float(match.group())
321              else:
322                  raise RuntimeError(
323                      f"Unable to read PID {param} from Out{channel}")
324          return params
325  
326      def read_status(self):
327          """
328          Read the device's status.
329  
330          :return: Device status information.
331          """
332          response = self.write('Status')
333          return response.decode().strip()
334  
335      def read_alarms(self):
336          """
337          Read active alarms on the device.
338  
339          :return: Alarm information.
340          """
341          response = self.write('Alarm')
342          return response.decode().strip()
343  
344      def list_channels(self):
345          try:
346              # Get all channel names using 'getOutput.names' command
347              response = self.get_variable('getOutput.names')
348              decoded_response = response.decode().strip()
349              decoded_response = decoded_response.replace('getOutput.names', '')
350              channel_names = [name.strip()
351                              for name in decoded_response.split(',') if name.strip()]
352  
353              
354              for i,name in enumerate(channel_names):
355                  if i <4:
356                      self.input_channels.append(name)
357                  elif i>3 and i<6:
358                      self.output_channels.append(name)
359                  else:
360                      self.aio_channels.append(name)
361                  
362          except Exception as e:
363              print(f"Error listing channels on CTC100: {e}")
364              raise e  # Re-raise the exception to indicate failure
365              # If any error occurs, default to predefined channels
366              self.input_channels = ['In1', 'In2']
367              self.output_channels = ['Out1', 'Out2']
368              self.aio_channels = ['AIO1', 'AIO2', 'AIO3', 'AIO4']
369  
370      def get_input_channels(self):
371          """
372          Get the list of input channels.
373  
374          :return: List of input channel names.
375          """
376          return self.input_channels
377  
378      def get_output_channels(self):
379          """
380          Get the list of output channels.
381  
382          :return: List of output channel names.
383          """
384          return self.output_channels
385  
386      def get_aio_channels(self):
387          """
388          Get the list of output channels.
389  
390          :return: List of output channel names.
391          """
392          return self.aio_channels
393      def link_heater_to_input(self, output_channel, input_channel):
394          """
395          Link a heater (output channel) with a thermometer (input channel) for PID control.
396  
397          :param output_channel: Output channel number (1 or 2).
398          :param input_channel: Input channel number or name.
399          :return: None.
400          """
401          # Ensure input channel format
402          if not isinstance(input_channel, str):
403              input_channel_name = f"{input_channel}"
404          else:
405              input_channel_name = input_channel
406  
407          # Construct the command without an '='
408          command = f"{output_channel}.PID.Input {input_channel_name}"
409          response = self.send_command(command)
410  
411          # Enable PID control
412          self.enable_PID(output_channel)
413          
414          print(response)
415          print(
416              f"Linked {output_channel} to {input_channel_name} for PID control.")
417          
418      def get_aio_iotype(self, channel):
419          """
420          Get the IOType of an AIO channel.
421  
422          :param channel: AIO channel number or name (e.g., 'AIO1' or 1).
423          :return: IOType of the channel ('Input', 'Set out', or 'Meas out').
424          """
425          if not isinstance(channel, str):
426              channel = f"{channel}"
427          response = self.get_variable(f"{channel}.IOType")
428          # Extract the IOType from the response
429          response_str = response.decode().strip()
430          match = re.search(rf"{channel}.IOType=(.*)", response_str)
431          if match:
432              iotype = match.group(1).strip()
433              return iotype
434          else:
435              # If response doesn't contain '=', try parsing the raw response
436              return response_str
437  
438      def set_aio_iotype(self, channel, iotype):
439          """
440          Set the IOType of an AIO channel.
441  
442          :param channel: AIO channel number or name (e.g., 'AIO1' or 1).
443          :param iotype: IOType to set ('Input', 'Set out', or 'Meas out').
444          :return: Response from the device.
445          """
446          valid_iotypes = ['Input', 'Set Out', 'Meas out']
447          if iotype not in valid_iotypes:
448              raise ValueError(
449                  f"Invalid IOType. Must be one of {valid_iotypes}.")
450          if not isinstance(channel, str):
451              channel = f"{channel}"
452          # Set the IOType using the appropriate format
453          response = self.set_variable(f"{channel}.IOType", f'"{iotype}"')
454          return response
455  
456      def get_aio_voltage(self, channel):
457          """
458          Get the voltage of an AIO channel configured as output ('Set out').
459  
460          :param channel: AIO channel number or name (e.g., 'AIO1' or 1).
461          :return: Voltage value in volts.
462          """
463          if not isinstance(channel, str):
464              channel = f"{channel}"
465          # Ensure the channel is configured as 'Set out'
466          iotype = self.get_aio_iotype(channel)
467          if iotype != 'Set out':
468              raise RuntimeError(
469                  f"{channel} is not configured as 'Set out'. Current IOType: {iotype}")
470          response = self.get_variable(f"{channel}.Value")
471          # Extract the voltage value from the response
472          match = re.search(r"[-+]?\d*\.\d+(?:[eE][-+]?\d+)?",
473                            response.decode("utf-8"))
474          if match:
475              voltage = float(match.group())
476              return voltage
477          else:
478              raise RuntimeError(f"Unable to read voltage from {channel}")
479  
480      def set_aio_voltage(self, channel, voltage):
481          """
482          Set the voltage of an AIO channel configured as output ('Set out').
483  
484          :param channel: AIO channel number or name (e.g., 'AIO1' or 1).
485          :param voltage: Voltage value to set (between -10 and +10 volts).
486          :return: Response from the device.
487          """
488          if not (-10.0 <= voltage <= 10.0):
489              raise ValueError("Voltage must be between -10 and +10 volts.")
490          if not isinstance(channel, str):
491              channel = f"{channel}"
492          # Ensure the channel is configured as 'Set out'
493          iotype = self.get_aio_iotype(channel)
494          if iotype != 'Set out':
495              raise RuntimeError(
496                  f"{channel} is not configured as 'Set out'. Current IOType: {iotype}")
497          # Set the voltage using the appropriate command
498          response = self.set_variable(f"{channel}.Value", voltage)
499          return response
500  
501      def send_command(self, command):
502          """
503          Send a custom command to the device and return the response.
504  
505          :param command: Custom command string.
506          :return: Response from the device.
507          """
508          response = self.write(command)
509          return response.decode().strip()
510  
511      def __del__(self):
512          """
513          Destructor to ensure the serial port is closed when the object is deleted.
514          """
515          try:
516              self.device.close()
517          except Exception:
518              pass
519