07-11-2025 07:37 PM
I have a PCIe 6321 with a BNC2110 breakout box. I am trying to write a series of analog outputs to a device on every falling digital edge I detect from another device. I have code which will do this using the nidaqmx package in Python, but it runs much slower than I would expect.
The falling digital edge occurs roughly every 125 microseconds; I have confirmed this with an oscilloscope. I have around 350 voltages I need to write to the analog output task. I would therefore expect the entire process to take about 45 milliseconds. I'm measuring this process to be around 800 milliseconds on my oscilloscope, which is far slower than expected.
I'm also receiving an error through the Python terminal that there are too many open files (zmq.error.ZMQError: Too many open files). I am not sure if the reason the program is slower is because of this file issue which I think is occurring because of task buffering, or because I'm writing/reading the analog output/digital input in an inefficient way to begin with.
Right now I have set up the digital input to change detection timing and had it register a change detection event for every falling edge. This runs a callback function which writes the next voltage to the analog output port. Once all voltages are written, this function stops and closes the tasks. To keep the tasks running while there are still voltages to be written, I check to see if the digital output task is done. If I don't do this the program closes immediately.
Is there a better way to do this? Has anyone had the same Python error as me while doing something like this? Thanks for any insight!
def sync_test():
print("Starting...")
global i
i=1
voltages = get_galvo_voltages() # function which returns an array of the voltages I must set the analog output port (galvo mirror) to
di_task = nidaqmx.Task()
ao_task = nidaqmx.Task()
def callback(task_handle=di_task._handle, signal_type=Signal.CHANGE_DETECTION_EVENT, callback_data=0):
global i
if (i<len(voltages)):
ao_task.write(voltages[i],timeout=0)
i+=1
return
else:
di_task.stop()
di_task.close()
ao_task.stop()
ao_task.close()
raise DoneScanning
di_task.di_channels.add_di_chan("Dev1/port0/line0") # resonant mirror voltage output
di_task.timing.cfg_change_detection_timing(falling_edge_chan="Dev1/port0/line0",sample_mode=AcquisitionType.CONTINUOUS)
di_task.register_signal_event(Signal.CHANGE_DETECTION_EVENT, callback)
ao_task.ao_channels.add_ao_voltage_chan("Dev1/ao1",min_val=-9.5,max_val=9.5,units=VoltageUnits.VOLTS)
ao_task.write(voltages[0]) # setting the first voltage
class DoneScanning(Exception):
pass
try:
di_task.start()
while (di_task.is_task_done()==False):
continue
except DoneScanning:
return
Solved! Go to Solution.
07-13-2025 07:51 PM
If you want faster execution speed, don't use Python. Python is an interpreter language and can be up to 100x slower than C programming.
Use LabVIEW, C or C# for high performance programming. Using NI-DAQmx in Text Based Programming Environments
07-13-2025 11:35 PM
Better way to do this hardware timed is to write all the series of voltages to AO as an array and setup the sample clock to come from PFI lines and transition on the falling edge, connect the digital signal to that PFI line.
Now, without the intervention of your software, the hardware will generate one sample from the array for every falling edge of the signal connected to the PFI as sample clock.
07-14-2025 06:33 PM
Thanks for the suggestion! This worked great and I'm getting the timing I would expect now. In case this helps anyone else, I included my code below.
def bnc2110_diao_sync_test():
print("Starting...")
voltages = get_galvo_voltages() # function which returns an array of the voltages I must set the analog output port (galvo mirror) to
di_task = nidaqmx.Task()
ao_task = nidaqmx.Task()
di_task.di_channels.add_di_chan("Dev1/PFI1")
di_task.start()
terminal_name = get_terminal_name_with_dev_prefix(di_task, "PFI1")
ao_task.ao_channels.add_ao_voltage_chan("Dev1/ao1",min_val=-9.5,max_val=9.5,units=VoltageUnits.VOLTS)
ao_task.timing.cfg_samp_clk_timing(8000,source=terminal_name,active_edge=Edge.FALLING,sample_mode=AcquisitionType.FINITE,samps_per_chan=len(voltages))
ao_task.triggers.start_trigger.cfg_dig_edge_start_trig(terminal_name,trigger_edge=Edge.FALLING)
ao_task.start()
ao_task.write(voltages)
ao_task.wait_until_done()
time.sleep(0.5) # this is necessary right now because the function ends without writing all of the voltages otherwise
di_task.stop()
di_task.close()
ao_task.stop()
ao_task.close()
07-14-2025 11:49 PM
You don't need the di_task, just the ao_task with the PFI1 terminal as clock source will achieve your goals.