Real-Time Measurement and Control

cancel
Showing results for 
Search instead for 
Did you mean: 

With nidaqmx on python, how can I stop a task mid-task (while task.write(signal) is running)?

Hi all!
I am writing a code to control a signal using the NI-DAQmx Python API, to communicate with NI's USB-6314. I want to be able to write a signal (a sine wave for example), but, if needed, be able to stop the writing and start a new function (basically update a function being output on the fly).

To figure this out, I have tried implementing threading with the following code:

 

def test_stop(task):
    print("stopping in 2 seconds")
    time.sleep(2)
    print("attempting to stop")
    task.stop()
    print("task has stopped")

device = "SimDev"

print("Signal parameters:\
       \nAmplitude: {}\
       \nFrequency: {}\
       \nDuration: {}\
      ".format(util.ampl,
               util.freq,
               util.duration))

dt, signal = util.fct1(util.ampl, util.freq, util.duration, util.sampling_rate)


#WRITE in thread - STOP in main
start_time = time.time()
print("\n=================================\nSTOP in thread - WRITE in main")
with nidaqmx.Task() as task:
    task.ao_channels.add_ao_voltage_chan(device+"/ao0")
    task.timing.cfg_samp_clk_timing(util.sampling_rate)

    task.start()

    thread1 = threading.Thread(target=task.write, args=(signal,))
    thread1.start()
    print("thread start")
    test_stop(task)

stop_time = time.time()
print("\nOverall time: {:.2f}s\
        \nSignal duration: {}s\
        ".format(stop_time-start_time,util.duration))

With this code, I would expect the overall time to be 2 seconds (due to the time.sleep(2)), but I get 10 seconds (=util.duration), showing that the signal is not stopped mid-task.

Any help would be greatly appreciated!

0 Kudos
Message 1 of 4
(1,181 Views)

What if you place the start_time before task.start()?

-------------------------------------------------------
Applications Engineer | TME Systems
https://tmesystems.net/
-------------------------------------------------------
https://github.com/ZhiYang-Ong
0 Kudos
Message 2 of 4
(1,165 Views)

Hi! Thanks for the reply! Unfortunately it comes to the same. I would "simply" like to find a way to update a continuous signal on the fly. There must be something made for this, as it's not an unthinkable feature, but I just can't seem to find anything...

0 Kudos
Message 3 of 4
(1,143 Views)

I am running it on a simulated device so it only takes 1 second, but it should only takes less than 3 seconds on an actual hardware.

import nidaqmx
import time
import numpy as np


cycles = 2 # how many sine cycles
resolution = 2000 # how many datapoints to generate
length = np.pi * 2 * cycles
my_wave = np.sin(np.arange(0, length, length / resolution))


with nidaqmx.Task() as task:
    task.ao_channels.add_ao_voltage_chan('Dev1/ao0')

    task.timing.cfg_samp_clk_timing(1000)

    start_time = time.time()
    print('1 Channel N Samples Write: ')
    print(task.write(my_wave, auto_start=True))
    task.wait_until_done()
    task.stop()
    stop_time = time.time()
    print("\nOverall time: {:.2f}s\
        ".format(stop_time-start_time))

    # task.ao_channels.add_ao_voltage_chan('Dev1/ao1:3')

    # print('N Channel N Samples Write: ')
    # print(task.write([[1.1, 2.2, 3.3], [1.1, 2.2, 4.4],
    #                   [2.2, 3.3, 4.4], [2.2, 3.3, 4.4]],
    #                  auto_start=True))
    # task.wait_until_done()
    # task.stop()

 

-------------------------------------------------------
Applications Engineer | TME Systems
https://tmesystems.net/
-------------------------------------------------------
https://github.com/ZhiYang-Ong
0 Kudos
Message 4 of 4
(1,131 Views)