10-10-2021 10:48 PM
Problem Definition: I need to use python to acquire some data for about 40 mins. The following code works fine but the only problem is that I want to see the data during the acquisition. Is there a way I can plot data at a certain refreshing rate?
The code below is modified from this post.
Thank you all!
# This works for a NI USB-6251
import nidaqmx
from nidaqmx.constants import AcquisitionType
from nidaqmx import stream_readers
import numpy as np
import csv
import time
from datetime import datetime
sample_rate = 2000 #Sampling frequency is 2000 Hz
samples_to_acq = 2000
wait_time = samples_to_acq/sample_rate # Data Acquisition Time in s
cont_mode = AcquisitionType.CONTINUOUS
iterations = 10
with nidaqmx.Task() as task:
now = datetime.now()
military = now.strftime('%H:%M:%S')
first_header = ['Event 1']
second_header = [f'T. Captura: {military}']
# Two voltage channels
task.ai_channels.add_ai_voltage_chan("Dev1/ai0")
task.ai_channels.add_ai_voltage_chan("Dev1/ai1")
total_wait_time = wait_time * iterations # We will only take 10 measurements, 10 s for this example
samples_to_acq_new = samples_to_acq * iterations # Also multiply by 10 to keep the same ratio, it should be
# Sets source of sample clock, its rate, and number of samples to aquire = buffer size
task.timing.cfg_samp_clk_timing(sample_rate, sample_mode = cont_mode, samps_per_chan = samples_to_acq_new)
start = time.time()
print ('Starting task...') # Just to keep a control in your console
data = np.ndarray((2, samples_to_acq_new), dtype = np.float64) #Creates an array, 2 columns
nidaqmx.stream_readers.AnalogMultiChannelReader(task.in_stream).read_many_sample(data, samples_to_acq_new, timeout = 10) # it should't take that long for this example, check out time for other exercises
# Saving the 2 channels to a csv file. This file is overwritten everytime the program is executed.
# It should appear in the same folder that your program is located.
with open('data.csv', 'w', newline = '') as f:
writer = csv.writer(f)
writer.writerow(first_header)
writer.writerow(second_header)
# adding some blank spaces in btw
writer.writerow('')
writer.writerow('')
x = np.linspace(0, total_wait_time, samples_to_acq_new) # Your x axis (ms), starts from 0, final time is total_wait_time, equally divided by the number of samples you'll capture
for value in range(len(x)):
writer.writerow([x[value], data[0][value], data[1][value]])
elapsed_time = (time.time() - start)
print (f'done in {elapsed_time}')