05-27-2020 01:22 AM
Hello there,
I am trying to synchronize the analogue write and analogue read calls in the Nidaqmx package for Python. I can successfully write an arbitrary pulse train, however when I read on the analogue input channel it misses the first pulse or few pulses. I don't think my triggers are synchronized. Any advice on how to modify my code would be greatly appreciated!
I have attached my python code.
Cheers!
Jordan.
05-27-2020 01:26 AM
import nidaqmx
import nidaqmx.system
import nidaqmx
import numpy as np
import matplotlib.pyplot as plt
system = nidaqmx.system.System.local()
system.driver_version
from nidaqmx import stream_writers
system = nidaqmx.system.System.local()
print(system.driver_version)
for device in system.devices:
dev = str(device).split('=')[1].split(')')[0]
print(device)
print(dev)
def fire_single_shot(dev,channel,pulse_duration):
channel = dev +'/' + channel
test_Task = nidaqmx.Task()
test_Task.ao_channels.add_ao_voltage_chan(channel)
test_Task.timing.cfg_samp_clk_timing(rate=1/pulse_duration*1000, sample_mode= nidaqmx.constants.AcquisitionType.FINITE, samps_per_chan= 2)
test_Writer = stream_writers.AnalogSingleChannelWriter(test_Task.out_stream, auto_start=True)
samples = np.append(5*np.ones(1),np.zeros(1))
test_Writer.write_many_sample(samples)
test_Task.wait_until_done(timeout=10)
test_Task.stop()
test_Task.close()
def fire_single_shot_2(dev,channel,pulse_duration):
channel = dev +'/' + channel
with nidaqmx.Task() as task:
task.ao_channels.add_ao_voltage_chan(channel)
task.timing.cfg_samp_clk_timing(rate=1/pulse_duration*1000, sample_mode= nidaqmx.constants.AcquisitionType.FINITE, samps_per_chan= 2)
Writer = stream_writers.AnalogSingleChannelWriter(task.out_stream, auto_start=True)
samples = np.append(5*np.ones(1),np.zeros(1))
Writer.write_many_sample(samples)
task.wait_until_done(timeout=10)
def fire_burst(dev,channel,pulse_on,pulse_off,pulse_count):
channel = dev +'/' + channel
with nidaqmx.Task() as task:
duty = pulse_on/(pulse_on+pulse_off) # duty cycle in %
N = 2**8 # no of points in stream array (8-bit)
array_on = int(N*duty) # on values in array
array_off = int(N-array_on) # off values in array
samples = np.append(5*np.ones(array_on),np.zeros(array_off))
task.ao_channels.add_ao_voltage_chan(channel)
task.timing.cfg_samp_clk_timing(rate=(array_on/N)*1/pulse_on*1000*N, sample_mode= nidaqmx.constants.AcquisitionType.FINITE , samps_per_chan= pulse_count*len(samples))
Writer = stream_writers.AnalogSingleChannelWriter(task.out_stream, auto_start=True)
Writer.write_many_sample(samples)
task.wait_until_done(timeout=20)
def fire_continuous(dev,channel,pulse_on,pulse_off):
channel = dev +'/' + channel
with nidaqmx.Task() as task:
duty = pulse_on/(pulse_on+pulse_off) # duty cycle in %
N = 2**8 # no of points in stream array
array_on = int(N*duty) # on values in array
array_off = int(N-array_on) # off values in array
samples = np.append(5*np.ones(array_on),np.zeros(array_off))
task.ao_channels.add_ao_voltage_chan(channel)
task.timing.cfg_samp_clk_timing(rate=(array_on/N)*1/pulse_on*1000*N, sample_mode= nidaqmx.constants.AcquisitionType.CONTINUOUS)
Writer = stream_writers.AnalogSingleChannelWriter(task.out_stream, auto_start=True)
Writer.write_many_sample(samples)
task.wait_until_done(timeout=20)
ana_out = 'ao1'
ana_in ='ai4'
channel = dev +'/' + ana_out
channel_read = dev +'/' + ana_in
pulse_duration = 5 # pulse on time in ms
pulse_off = 15 # pulse off time in ms
pulse_on = 40 # pulse on time in ms
duty = pulse_duration/(pulse_duration+pulse_off) # duty cycle in %
N = 1000 # no of points in stream array
array_on = int(N*duty) # on values in array
array_off = int(N-array_on) # off values in array
pulse_count = int(6) # no of desired pulses in pulse train
samples = np.append(5*np.ones(array_on),np.zeros(array_off))
write_Task = nidaqmx.Task()
read_Task = nidaqmx.Task()
with nidaqmx.Task() as writeTask, nidaqmx.Task() as readTask:
writeTask.ao_channels.add_ao_voltage_chan(channel)
readTask.ai_channels.add_ai_voltage_chan(channel_read)
writeTask.timing.cfg_samp_clk_timing(rate=(array_on/N)*1/pulse_duration*1000*N, sample_mode= nidaqmx.constants.AcquisitionType.FINITE , samps_per_chan= pulse_count*len(samples))
readTask.timing.cfg_samp_clk_timing(rate=(array_on/N)*1/pulse_duration*1000*N, sample_mode= nidaqmx.constants.AcquisitionType.FINITE , samps_per_chan= pulse_count*len(samples))
#writeTask.triggers.start_trigger.cfg_dig_edge_start_trig(trigger_source = '/Dev2/ai4/') #Setting the trigger on the analog input
data = readTask.read(number_of_samples_per_channel=pulse_count*len(samples),timeout = nidaqmx.constants.WAIT_INFINITELY)
writer = stream_writers.AnalogSingleChannelWriter(writeTask.out_stream, auto_start=True)
writer.write_many_sample(samples)
#writer.wait_until_done(timeout=10)
data = readTask.read(number_of_samples_per_channel=pulse_count*len(samples),timeout = nidaqmx.constants.WAIT_INFINITELY)
print("The channels linked to WriteTask are: ")
for i in writeTask.ao_channels:
print(i)
plt.plot(data)