LabWindows/CVI

cancel
Showing results for 
Search instead for 
Did you mean: 

Thread Callback is never called - CVI bug?

Hello everyone,

 

I am working on multithreaded application. Each thread must make some computation on part of large array of integers.

 

To gather my data immediately after I finish, I try to use TCQCallback, which is created like in the following line:

 

CmtInstallTSQCallback(TSQueues[n], 
EVENT_TSQ_ITEMS_IN_QUEUE, 
data_size, 
tsqCopyImagePart, 
0, 
CmtGetCurrentThreadID(), 
&TSQCallbackID[n]);

 

where:

data_size = columns * rows of created part of array, but it doesn't really matter - whatever number I choose, it never calls the callback,

tsqCopyImage is my function.

 

Function code:

// Thread function
int CVICALLBACK computePartFunction (void* fcnDataStructure) {

	tData* threadDataPart = (tData*) calloc (1, sizeof(tData));
	threadDataPart = (tData*) fcnDataStructure;

	int currentThread = CmtGetCurrentThreadID();
	int i, j;
	
	int row_size = threadDataPart->row_size;
	int col_size = threadDataPart->col_size;

	
	int* tsq_write_ptr;
	int free_space_left;
	
	int tsqHandle = threadDataPart->tsqHandle;

//	CmtGetTSQWritePtr(tsqHandle, &tsq_write_ptr, &free_space_left);
//	printf("ID: %d, Free space left: %d,  Expected: %d \n", currentThread, free_space_left, row_size * col_size);

	int counter = 0;
	int items_flushed;
	int items_stored;
	int space_left;
	
	// Let's compute it!
	for (i = 0; i < row_size; i++) {
		for (j = 0; j < col_size; j++) {
			
			counter++;
			
			// In real application, it is a function of inputImgPart elements
			// - this app serves other purposes - thread management and processing
			threadDataPart->outputImgPart[i][j] = currentThread + j;
//			*tsq_write_ptr++ = currentThread + j; // threadDataPart->outputImgPart[i][j];
		}

		CmtWriteTSQData(tsqHandle, threadDataPart->outputImgPart[i], col_size, TSQ_INFINITE_TIMEOUT, &items_flushed);
		CmtGetTSQAttribute(tsqHandle, ATTR_TSQ_QUEUE_FREE_SPACE, &space_left);
		CmtGetTSQAttribute(tsqHandle, ATTR_TSQ_ITEMS_IN_QUEUE, &items_stored);
		printf("ID: %d, Counter = %d, Items flushed: %d, Items stored: %d, space left: %d \n", currentThread, counter, items_flushed, items_stored, space_left);
		
	}

	int items_written;
//    CmtReleaseTSQWritePtr (tsqHandle, &items_written);
//	printf("ID: %d, Counter = %d, Items written to the queue: %d,  Expected: %d \n", currentThread, counter, items_written, row_size * col_size);

	free(threadDataPart);
	return 0;
}

I tried all possible event indicators (EVENT_TSQ_ITEMS_IN_QUEUE,  EVENT_TSQ_QUEUE_SIZE,  EVENT_TSQ_QUEUE_SPACE_FREE ) and I understand how they are supposed to work. It seems like it is some kind of a bug.

 

At first I thought that maybe TSQ's are local, but I immediately reminded myself how stupid that would be - it's purpose is to exchange data between threads, so it must be visible in any thread I want.

 

The only related information about similar issur I found here: http://forums.ni.com/t5/LabWindows-CVI/TSQ-callback-not-called/m-p/862685#M40055 , but I can't even imagine that this could be not fixed yet after all these years.

 

Please, check if there is no bug related to the issue or check if there is no misunderstanding in my thinking. Any suggestion would be appreciated.

0 Kudos
Message 1 of 13
(6,807 Views)

Which thread is supposed to handle TSQ callback? If it''s the main thread then all should be OK,  as the main thread run with a RunUserInterface that processes events (but be sure not to be stuck into a tight loop that prevents event processing). Other threads do not process events by themselves unless you explicitly code that,  so this may be a problem if you want to gather data from the queue in a separate thread. This is explained in the help for callbackThreadID parameter of the function.

 

Besides that, I don't see a link between TSQueues [n] and threadDataPart->TSQ handle,  unless you copy the handle somewhere in the appropriate data structure: chek the correct handle is copied in this case.

 

Finally: are you trapping errors in actual code? Are you receiving errors while running it?



Proud to use LW/CVI from 3.1 on.

My contributions to the Developer Community
________________________________________
If I have helped you, why not giving me a kudos?
0 Kudos
Message 2 of 13
(6,722 Views)

 --- Update posted in next post ---

0 Kudos
Message 3 of 13
(6,690 Views)

Thank you for response and sorry for delay - the code was unavailable to be during weekend and thus, couldn not proceed with informative answer. TSQ callback is handled by main thread (by CmtGetCurrentThreadID() called inside main), so there shouldn't be any problems indeed. I already tested it and processing data by my thread is OK. When I stored my data inside struct passed to each thread, I can get expected data.

 

The reason why I use TSQ here is that I want to get results as fast as possible and save as much of the computation resources as possible. My second option is to use loop scanning through all threads, getting its' attributes about status and running function collecting part of returned struct, which would probably eat singifcant part of resources in one of my (not too many) threads. For these reasons, I would like to avoid it if it is possible.

 

TSQueues[n] is an array, which stores IDs for each queue (one per thread). It is passing correctly to the thread and I already checked that - IDs in TSQueues[n] match those passed to each thread and there is no error for sure (I print them out any time I run my app and use them).

 

Here is my struct holding data to process:

 

typedef struct threadData {
	
	int** inputImgPart;		// Array of pixels - not calculated
	int** outputImgPart;	        // Array of pixels - calculated and ready to copy
	
	int col_size;	        // Dimensions
	int row_size;
	
	int minRow;		// Positioning in full array
	int maxRow;
	
	int tsqHandle;	// Pointer to tsqHandle
	
} tData;

and the following lines handle thread IDs and TSQ processing:

 

	CmtTSQHandle* TSQueues;
	TSQueues = (CmtTSQHandle*) malloc (no_of_threads * sizeof(CmtTSQHandle));

	int* TSQCallbackID;
	TSQCallbackID = (int*) malloc (no_of_threads * sizeof(int));

        // ...

int free_space; // Divide the array, execute threads for (n = 0; n < no_of_threads; n++) { // New Thread - Safe Queue (TSQ) for each thread - we can instantly copy data to main thread data_size = part_rows * part_cols; CmtNewTSQ(data_size, sizeof(int), OPT_TSQ_DYNAMIC_SIZE, &TSQueues[n]); CmtGetTSQAttribute(TSQueues[n], ATTR_TSQ_QUEUE_FREE_SPACE, &free_space); printf("Free space for n = %d : %d \n", n, free_space); CmtInstallTSQCallback(TSQueues[n], EVENT_TSQ_ITEMS_IN_QUEUE, data_size, tsqCopyImagePart, 0, CmtGetCurrentThreadID(), &TSQCallbackID[n]); threadDataParts[n]->tsqHandle = TSQueues[n]; // Time to call my thread CmtScheduleThreadPoolFunction(threadPoolHandle, computePartFunction, threadDataParts[n], &threadFunctionID[n]); }

 

Here is my testing code - it checks exactly the same condition that is checked by CmtInstallTSQCallback (which is hitting data_size number by number of items in queue in this case). It proves that storing data in TSQ works as expected - I managed to hit it with any of the 3 conditions:

 

	int count_callbacks = 0;
	for (;;) {
		count++;
		for (n = 0; n < no_of_threads; n++) {
			CmtGetTSQAttribute(TSQueues[n], ATTR_TSQ_ITEMS_IN_QUEUE, &items_in_queue);
			if (threadDataParts[n]->col_size * threadDataParts[n]->row_size == items_in_queue) {
				printf("%d should have executed this callback...\n", n);
				count_callbacks++;
			}
		}
		
		if (count_callbacks > 100)
			break;
	}

I do not get any errors during compilation of my code - the only thing that does not work here is no call for my callback function no matter what I try (EVENT_TSQ_QUEUE_SIZE with threshold data_size, EVENT_TSQ_ITEMS_IN_QUEUE with threshold data_size or EVENT_TSQ_QUEUE_SPACE_FREE with threshold 0 - I just want to pass whole, computed part of array immediately). Callback function reading data is in progress, but it does not matter, as my program never reaches this function (I just try to print anything to the console out of it).

 

0 Kudos
Message 4 of 13
(6,692 Views)

Ok, I just wrote data collection - I checked that any TSQ callback is always installed on my main thread. Here is (rather simple) code for this, which stores each part of my array in main array. There should not be any problems with it - the idea is to copy data in row-by-row fashion, pretty much the same thing happens during storing processed array inside my queues. There also should not be any problems with access to the main array by multiple queues, as any data store happens inside one, main thread.

 

void CVICALLBACK tsqCopyImagePart (CmtTSQHandle queueHandle, unsigned int event, int value, void *callbackData) {
	int mask = 0;
	char* ok = "not ok";
	printf("ID %d: I reached that function! :D\n", value);  // I hope to say so... :)
	
	// Get to the data we need to copy image part appropriately
	tcbData* partData;
	partData = (tcbData*) callbackData; 
	
	int i;
	int min_row = partData->minRow;
	int max_row = partData->maxRow;
	int row_size = max_row - min_row + 1;
	int col_size = partData->col_size;
	
	for (i = min_row; i <= max_row; i++) {
		CmtReadTSQData(queueHandle, sampleImage[i+min_row], col_size, TSQ_INFINITE_TIMEOUT, 0);
	}
}

 

 

One more question: is it correct to install the same callback function for more than one TSQ? (I mean just the function, not passed callback data nor thread - I assume I can do that since I get no errors, but I try to find the solution by writing up my semi-theories during process - this might help at some point I believe.. 🙂 )

0 Kudos
Message 5 of 13
(6,647 Views)

Are you compiling on 32 or 64 bit? I say that because you are actually casting the queue handle into an int, while it really is defined as an intptr_t which is 8 bytes long in a 64bit environment.

Additionally, I see you have no run-time error trap: does the calls to TSQ functions return any error during execution?

 

Besides that I see no evident reason for not calling the queue callback. What happens if you set no_of_threads to 1? Is the callback ever called?

 

It appears that you are filling an array in a global memory struct and then you pass again the whole data into the TSQ: if this is your real scenario and not a reduced code skeleton for this discussion only, then you could replace the TSQ with a PostDeferredCall to a function that handles the data, passing the item (thread) number into function callbackData.



Proud to use LW/CVI from 3.1 on.

My contributions to the Developer Community
________________________________________
If I have helped you, why not giving me a kudos?
Message 6 of 13
(6,633 Views)

Thank you for suggestions. I compile it on 32 bit, so this shouldn't matter – they both should be 4 bytes long. I tried to switch that – inside header it throws error of 'unknown type name CmtTSQHandle' inside my struct. However, when I moved struct definition to main file, it worked fine, but didn't really matter, so I switched it back to int casting.

 

Setting no_of_threads to 1 worked as expected – it just processes whole array in one thread and I receive it in passed, global struct.

 

The solution for this problem was in event handling. I forgot to add this line:

 

CmtWaitForThreadPoolFunctionCompletion (threadPoolHandle, threadFunctionID[n], OPT_TL_PROCESS_EVENTS_WHILE_WAITING);

However, I have some additional questions to make a good understanding what was / is going on over there. I hope I am not overusing the kindness I met here and will be able to help other people with my (still modest, but growing) experience in the future.

 

 

1. Consider two different code snippets:

 

// Case #1 - Schedule waiting for function completion right after I schedule thread itself:
// Use threads to modify parts of array for (n = 0; n < no_of_threads; n++) { // Creating structs for threads here

// ...

// Announce thread CmtScheduleThreadPoolFunction(threadPoolHandle, computePartFunction, threadDataParts[n], &threadFunctionID[n]); CmtWaitForThreadPoolFunctionCompletion (threadPoolHandle, threadFunctionID[n], OPT_TL_PROCESS_EVENTS_WHILE_WAITING); } for (n = 0; n < no_of_threads; n++) { CmtUninstallTSQCallback(TSQueues[n], TSQCallbackID[n]); }
// Case #2 - Schedule all threads and after that, schedule all waiting functions:
 
	// Use threads to modify parts of array
	for (n = 0; n < no_of_threads; n++) {

		// Creating structs for threads here

                // ...

		// Announce thread
		CmtScheduleThreadPoolFunction(threadPoolHandle, computePartFunction, threadDataParts[n], &threadFunctionID[n]);
	}

        // After threads are created and run, I schedule waiting
	for (n = 0; n < no_of_threads; n++) {
		CmtWaitForThreadPoolFunctionCompletion (threadPoolHandle, threadFunctionID[n], OPT_TL_PROCESS_EVENTS_WHILE_WAITING); 
	}
	
	for (n = 0; n < no_of_threads; n++) {
		CmtUninstallTSQCallback(TSQueues[n], TSQCallbackID[n]);
	}

The first case works correctly and the second seems to have the same problem, as I had earlier. My semi-theory for that behaviour is that int the Case #2 I scheduled CmtWaitForThreadPoolFunctionCompletion, but right after that I uninstall queue callbacks, so the program had not enough time to loop through the events even once to call my callbacks (I just removed them from event queue). But then, it should not call my last callback, because this would give even less time for the last callback to be noticed!

 

 

2. This works as expected - I can loop through events and process them during thread processing:

 

CmtWaitForThreadPoolFunctionCompletion (threadPoolHandle, threadFunctionID[n], OPT_TL_PROCESS_EVENTS_WHILE_WAITING); 

However, I would expect that the following line would hang my program until thread finishes computation. In other words, this would cause REAL waiting for this job done, not the virtual one letting the program go on with other things like when using the above line of code. Thus, it would be far more useful in this situation:

 

CmtWaitForThreadPoolFunctionCompletion (threadPoolHandle, threadFunctionID[n], 0);

Instead, AGAIN I meet the same lack of callbacks like before. My intuition tells me that here, 0 also prevents me from looping through events and as a result, it finishes and the I loop through with removed callback events detectors. If it really works like that, waiting for the end of the thread is stripped from any possible callbacks, which doesn't make much sense! 

 

 

3. Besides, I would like to make it in a more 'local' fashion, I just couldn't find any sensible way to do so. I have to pass struct to part of array somehow and creating local copies of it seems more legit (maybe direct streaming using TSQ?), then I would appreciate any suggestion. If there would be any level of locality available as an option, I guess it would speed up the process a bit.

 

 

Apart from that, any reader of this text should consider it as an attempt to understand the mechanics here, not a rant (even if I might lack patience at the moment 😉 ). Feel free to correct my mistakes or read for better understanding, I will greatly appreciate it.

0 Kudos
Message 7 of 13
(6,622 Views)

I took some time to respond trying to guess what can be happening but  with no new ideas on this subject.

 

Nevertheless, I suggest you revise your scenario and rethink how it's supposed to work starting from a basic concept: calling CmtWaitForThreadPoolFunctionCompletion is not needed at all! You can develop a multithreaded program that does not make use of this function and works correctly. What this function is adding to your scenario is the ability to process events while waiting for completion of the thread function, and here we come back to original question: is your main thread processing events? Or: are you sure you are installing queue function in the main thread? Have you tried using CmtGetMainThreadID instead of CmtGetCurrentThreadID while installing the queue functions?

 

I suggest you take a look at samples\utility\Threading\ThreadSafeQueue\Overflow\Overflow.prj for an exhaustinve example on using thread safe queues: you will notice, among other things, that it uses cmtWaitForThreadFunctionCompletion only at program end, mainly to avoid being stuck in a running thread function while the program is supposed to terminate.



Proud to use LW/CVI from 3.1 on.

My contributions to the Developer Community
________________________________________
If I have helped you, why not giving me a kudos?
0 Kudos
Message 8 of 13
(6,580 Views)

I tried all possible scenarios and it seems like CmtWaitForThreadPoolFunctionCompletion() is the only way to make it hit my callbacks. At first, I thought that I uninstall my TSQ Callbacks before threads are done with computing. If so, Delay() would work perfectly well, but setting Delay (1) after computing simple addition on array of 256 elements (which takes no more than 0.01 sec) doesn't change anything. Then I thought that Delay() might stop event scanning for some crazy reason and I input a huge for loop, but this didn't work as well. I print my table right after uninstalling my TSQs, so there is no way I would print it out too early - everything should be done at this point and we can't use the callback anyway. 

 

I already have seen the example you referenced and I can see that this should work without it, which is really confusing. I try to imagine myself what might be the change that this line really brings to the code, apart from what you have just pointed out. I seriously would not like to put it on the shelf 'Magic' and use it when it is required.

 

Every TSQ callback is called at the main thread - CmtGetMainThreadID() and CmtGetCurrentThreadID() result in the same ID. Changing thread pool to DEFAULT_THREAD_POOL_HANDLE doesn't change the situation, so it is not a matter of thread pool. 

 

The scenario in my application looks like the following:
1. Create new thread pool with N threads.
2. Divide original array into N new, allocated chunks by rows.
3. For every potential thread:
    3a. Allocate and fill structure passed thread as array of N structs.
    3b. Divide array to smaller N chunks and store inside structs.
    3c. Create TSQ callbacks waiting to fill up every TSQ with data.
    3d. Start threads and wait for the data in TSQ.
    3e. Call CmtWaitForThreadPoolFunctionCompletion().
4. Collect data immediately from every thread once the TSQ callback is called.
5. Call CmtUninstallTSQThread() for every TSQ created earlier.
6. Print the original array with uploaded (or not) results from threads.

 

To my Thread, I pass struct containing input array part, empty output array (which is not really necessary, but was useful in testing it), dimensions and position. To my TSQ Callback, I pass one struct, which contains only array part dimensions and pointer to original array).

I can't really imagine myself what else might cause this bug to happen.

 

0 Kudos
Message 9 of 13
(6,574 Views)

Well, I cannot argue more than what I have done till now without an actual and working code to look at.

If you can provide a working example that exposes the anomaly me or somebody else could examine the situation in detail.



Proud to use LW/CVI from 3.1 on.

My contributions to the Developer Community
________________________________________
If I have helped you, why not giving me a kudos?
0 Kudos
Message 10 of 13
(6,565 Views)