From 8b0b857eaaa50c6ec217a46c0577395c78ec04c7 Mon Sep 17 00:00:00 2001
From: sapier <Sapier at GMX dot net>
Date: Mon, 6 Jan 2014 12:45:42 +0100
Subject: [PATCH] Make MutexQueue use jsemaphore for signaling

---
 src/client.cpp                     |  18 +++-
 src/client.h                       |   8 ++
 src/connection.cpp                 |   5 +-
 src/game.cpp                       |  32 +++---
 src/httpfetch.cpp                  |   2 +-
 src/itemdef.cpp                    |   1 +
 src/jthread/jsemaphore.h           |   1 +
 src/jthread/pthread/jsemaphore.cpp |  31 ++++++
 src/jthread/win32/jsemaphore.cpp   |  15 +++
 src/shader.cpp                     |  24 +++--
 src/tile.cpp                       |   1 +
 src/util/container.h               | 151 ++++++++++++++++++++---------
 src/util/thread.h                  |  62 ++++++------
 13 files changed, 250 insertions(+), 101 deletions(-)

diff --git a/src/client.cpp b/src/client.cpp
index b830bcdf3..721c413c0 100644
--- a/src/client.cpp
+++ b/src/client.cpp
@@ -286,6 +286,20 @@ Client::Client(
 	}
 }
 
+void Client::Stop()
+{
+	//request all client managed threads to stop
+	m_mesh_update_thread.Stop();
+}
+
+bool Client::isShutdown()
+{
+
+	if (!m_mesh_update_thread.IsRunning()) return true;
+
+	return false;
+}
+
 Client::~Client()
 {
 	{
@@ -296,7 +310,7 @@ Client::~Client()
 	m_mesh_update_thread.Stop();
 	m_mesh_update_thread.Wait();
 	while(!m_mesh_update_thread.m_queue_out.empty()) {
-		MeshUpdateResult r = m_mesh_update_thread.m_queue_out.pop_front();
+		MeshUpdateResult r = m_mesh_update_thread.m_queue_out.pop_frontNoEx();
 		delete r.mesh;
 	}
 
@@ -692,7 +706,7 @@ void Client::step(float dtime)
 		while(!m_mesh_update_thread.m_queue_out.empty())
 		{
 			num_processed_meshes++;
-			MeshUpdateResult r = m_mesh_update_thread.m_queue_out.pop_front();
+			MeshUpdateResult r = m_mesh_update_thread.m_queue_out.pop_frontNoEx();
 			MapBlock *block = m_env.getMap().getBlockNoCreateNoEx(r.p);
 			if(block)
 			{
diff --git a/src/client.h b/src/client.h
index 1ed80a2b0..1b7ad48e6 100644
--- a/src/client.h
+++ b/src/client.h
@@ -289,6 +289,14 @@ class Client : public con::PeerHandler, public InventoryManager, public IGameDef
 	);
 	
 	~Client();
+
+	/*
+	 request all threads managed by client to be stopped
+	 */
+	void Stop();
+
+
+	bool isShutdown();
 	/*
 		The name of the local player should already be set when
 		calling this, as it is sent in the initialization.
diff --git a/src/connection.cpp b/src/connection.cpp
index 8f83f6219..bc9279649 100644
--- a/src/connection.cpp
+++ b/src/connection.cpp
@@ -592,8 +592,9 @@ void * Connection::Thread()
 		
 		runTimeouts(dtime);
 
+		//NOTE this is only thread safe for ONE consumer thread!
 		while(!m_command_queue.empty()){
-			ConnectionCommand c = m_command_queue.pop_front();
+			ConnectionCommand c = m_command_queue.pop_frontNoEx();
 			processCommand(c);
 		}
 
@@ -1556,7 +1557,7 @@ ConnectionEvent Connection::getEvent()
 		e.type = CONNEVENT_NONE;
 		return e;
 	}
-	return m_event_queue.pop_front();
+	return m_event_queue.pop_frontNoEx();
 }
 
 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
diff --git a/src/game.cpp b/src/game.cpp
index b751a2b62..aef60484f 100644
--- a/src/game.cpp
+++ b/src/game.cpp
@@ -813,7 +813,7 @@ class GameGlobalShaderConstantSetter : public IShaderConstantSetter
 		services->setVertexShaderConstant("animationTimer", &animation_timer_f, 1);
 
 		LocalPlayer* player = m_client->getEnv().getLocalPlayer();
-		v3f eye_position = player->getEyePosition(); 
+		v3f eye_position = player->getEyePosition();
 		services->setPixelShaderConstant("eyePosition", (irr::f32*)&eye_position, 3);
 		services->setVertexShaderConstant("eyePosition", (irr::f32*)&eye_position, 3);
 
@@ -1876,12 +1876,12 @@ void the_game(
 		}
 		else if(input->wasKeyDown(getKeySetting("keymap_screenshot")))
 		{
-			irr::video::IImage* const image = driver->createScreenShot(); 
-			if (image) { 
-				irr::c8 filename[256]; 
-				snprintf(filename, 256, "%s" DIR_DELIM "screenshot_%u.png", 
+			irr::video::IImage* const image = driver->createScreenShot();
+			if (image) {
+				irr::c8 filename[256];
+				snprintf(filename, 256, "%s" DIR_DELIM "screenshot_%u.png",
 						 g_settings->get("screenshot_path").c_str(),
-						 device->getTimer()->getRealTime()); 
+						 device->getTimer()->getRealTime());
 				if (driver->writeImageToFile(image, filename)) {
 					std::wstringstream sstr;
 					sstr<<"Saved screenshot to '"<<filename<<"'";
@@ -1891,8 +1891,8 @@ void the_game(
 				} else{
 					infostream<<"Failed to save screenshot '"<<filename<<"'"<<std::endl;
 				}
-				image->drop(); 
-			}			 
+				image->drop();
+			}
 		}
 		else if(input->wasKeyDown(getKeySetting("keymap_toggle_hud")))
 		{
@@ -2263,7 +2263,7 @@ void the_game(
 							new MainRespawnInitiator(
 									&respawn_menu_active, &client);
 					GUIDeathScreen *menu =
-							new GUIDeathScreen(guienv, guiroot, -1, 
+							new GUIDeathScreen(guienv, guiroot, -1,
 								&g_menumgr, respawner);
 					menu->drop();
 					
@@ -2755,7 +2755,7 @@ void the_game(
 				
 				// Sign special case, at least until formspec is properly implemented.
 				// Deprecated?
-				if(meta && meta->getString("formspec") == "hack:sign_text_input" 
+				if(meta && meta->getString("formspec") == "hack:sign_text_input"
 						&& !random_input
 						&& !input->isKeyDown(getKeySetting("keymap_sneak")))
 				{
@@ -3222,7 +3222,7 @@ void the_game(
 
 				driver->getOverrideMaterial().Material.ColorMask = irr::video::ECP_RED;
 				driver->getOverrideMaterial().EnableFlags  = irr::video::EMF_COLOR_MASK;
-				driver->getOverrideMaterial().EnablePasses = irr::scene::ESNRP_SKY_BOX + 
+				driver->getOverrideMaterial().EnablePasses = irr::scene::ESNRP_SKY_BOX +
 															 irr::scene::ESNRP_SOLID +
 															 irr::scene::ESNRP_TRANSPARENT +
 															 irr::scene::ESNRP_TRANSPARENT_EFFECT +
@@ -3433,6 +3433,16 @@ void the_game(
 	chat_backend.addMessage(L"", L"# Disconnected.");
 	chat_backend.addMessage(L"", L"");
 
+	client.Stop();
+
+	//force answer all texture and shader jobs (TODO return empty values)
+
+	while(!client.isShutdown()) {
+		tsrc->processQueue();
+		shsrc->processQueue();
+		sleep_ms(100);
+	}
+
 	// Client scope (client is destructed before destructing *def and tsrc)
 	}while(0);
 	} // try-catch
diff --git a/src/httpfetch.cpp b/src/httpfetch.cpp
index 9eed045fe..176a3b22a 100644
--- a/src/httpfetch.cpp
+++ b/src/httpfetch.cpp
@@ -594,7 +594,7 @@ class CurlFetchThread : public JThread
 			*/
 
 			while (!m_requests.empty()) {
-				Request req = m_requests.pop_front();
+				Request req = m_requests.pop_frontNoEx();
 				processRequest(req);
 			}
 			processQueued(&pool);
diff --git a/src/itemdef.cpp b/src/itemdef.cpp
index f77a198b5..d5e03f7b3 100644
--- a/src/itemdef.cpp
+++ b/src/itemdef.cpp
@@ -642,6 +642,7 @@ class CItemDefManager: public IWritableItemDefManager
 	void processQueue(IGameDef *gamedef)
 	{
 #ifndef SERVER
+		//NOTE this is only thread safe for ONE consumer thread!
 		while(!m_get_clientcached_queue.empty())
 		{
 			GetRequest<std::string, ClientCached*, u8, u8>
diff --git a/src/jthread/jsemaphore.h b/src/jthread/jsemaphore.h
index 70318d5da..b62add253 100644
--- a/src/jthread/jsemaphore.h
+++ b/src/jthread/jsemaphore.h
@@ -36,6 +36,7 @@ class JSemaphore {
 
 	void Post();
 	void Wait();
+	bool Wait(unsigned int time_ms);
 
 	int GetValue();
 
diff --git a/src/jthread/pthread/jsemaphore.cpp b/src/jthread/pthread/jsemaphore.cpp
index 962b582f1..ee1431065 100644
--- a/src/jthread/pthread/jsemaphore.cpp
+++ b/src/jthread/pthread/jsemaphore.cpp
@@ -17,8 +17,12 @@ with this program; if not, write to the Free Software Foundation, Inc.,
 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 */
 #include <assert.h>
+#include <errno.h>
+#include <sys/time.h>
 #include "jthread/jsemaphore.h"
+
 #define UNUSED(expr) do { (void)(expr); } while (0)
+
 JSemaphore::JSemaphore() {
 	int sem_init_retval = sem_init(&m_semaphore,0,0);
 	assert(sem_init_retval == 0);
@@ -49,6 +53,33 @@ void JSemaphore::Wait() {
 	UNUSED(sem_wait_retval);
 }
 
+bool JSemaphore::Wait(unsigned int time_ms) {
+	struct timespec waittime;
+	struct timeval now;
+
+	if (gettimeofday(&now, NULL) == -1) {
+		assert("Unable to get time by clock_gettime!" == 0);
+		return false;
+	}
+
+	waittime.tv_nsec = ((time_ms % 1000) * 1000 * 1000) + (now.tv_usec * 1000);
+	waittime.tv_sec  = (time_ms / 1000) + (waittime.tv_nsec / (1000*1000*1000)) + now.tv_sec;
+	waittime.tv_nsec %= 1000*1000*1000;
+
+	errno = 0;
+	int sem_wait_retval = sem_timedwait(&m_semaphore,&waittime);
+
+	if (sem_wait_retval == 0)
+	{
+		return true;
+	}
+	else {
+		assert((errno == ETIMEDOUT) || (errno == EINTR));
+		return false;
+	}
+	return sem_wait_retval == 0 ? true : false;
+}
+
 int JSemaphore::GetValue() {
 
 	int retval = 0;
diff --git a/src/jthread/win32/jsemaphore.cpp b/src/jthread/win32/jsemaphore.cpp
index 3a1f2715c..34167f391 100755
--- a/src/jthread/win32/jsemaphore.cpp
+++ b/src/jthread/win32/jsemaphore.cpp
@@ -51,6 +51,21 @@ void JSemaphore::Wait() {
 			INFINITE);
 }
 
+bool JSemaphore::Wait(unsigned int time_ms) {
+	unsigned int retval = WaitForSingleObject(
+			m_hSemaphore,
+			time_ms);
+
+	if (retval == WAIT_OBJECT_0)
+	{
+		return true;
+	}
+	else {
+		assert(retval == WAIT_TIMEOUT);
+		return false;
+	}
+}
+
 int JSemaphore::GetValue() {
 
 	long int retval = 0;
diff --git a/src/shader.cpp b/src/shader.cpp
index 39296f6a3..d29c9d3a7 100644
--- a/src/shader.cpp
+++ b/src/shader.cpp
@@ -427,21 +427,18 @@ u32 ShaderSource::getShaderId(const std::string &name)
 		/* infostream<<"Waiting for shader from main thread, name=\""
 				<<name<<"\""<<std::endl;*/
 
-		try{
-			while(true) {
-				// Wait result for a second
-				GetResult<std::string, u32, u8, u8>
-					result = result_queue.pop_front(1000);
-
-				if (result.key == name) {
-					return result.item;
-				}
+		while(true) {
+			GetResult<std::string, u32, u8, u8>
+				result = result_queue.pop_frontNoEx();
+
+			if (result.key == name) {
+				return result.item;
+			}
+			else {
+				errorstream << "Got shader with invalid name: " << result.key << std::endl;
 			}
 		}
-		catch(ItemNotFoundException &e){
-			errorstream<<"Waiting for shader " << name << " timed out."<<std::endl;
-			return 0;
-		}
+
 	}
 
 	infostream<<"getShaderId(): Failed"<<std::endl;
@@ -537,6 +534,7 @@ void ShaderSource::processQueue()
 	/*
 		Fetch shaders
 	*/
+	//NOTE this is only thread safe for ONE consumer thread!
 	if(!m_get_shader_queue.empty()){
 		GetRequest<std::string, u32, u8, u8>
 				request = m_get_shader_queue.pop();
diff --git a/src/tile.cpp b/src/tile.cpp
index e003c3020..b8080c708 100644
--- a/src/tile.cpp
+++ b/src/tile.cpp
@@ -775,6 +775,7 @@ void TextureSource::processQueue()
 	/*
 		Fetch textures
 	*/
+	//NOTE this is only thread safe for ONE consumer thread!
 	if(!m_get_texture_queue.empty())
 	{
 		GetRequest<std::string, u32, u8, u8>
diff --git a/src/util/container.h b/src/util/container.h
index e83c3cd37..6d836a4d5 100644
--- a/src/util/container.h
+++ b/src/util/container.h
@@ -24,7 +24,7 @@ with this program; if not, write to the Free Software Foundation, Inc.,
 #include "../exceptions.h"
 #include "../jthread/jmutex.h"
 #include "../jthread/jmutexautolock.h"
-#include "../porting.h" // For sleep_ms
+#include "../jthread/jsemaphore.h"
 #include <list>
 #include <vector>
 #include <map>
@@ -201,6 +201,12 @@ class Queue
 		++m_list_size;
 	}
 	
+	void push_front(T t)
+	{
+		m_list.push_front(t);
+		++m_list_size;
+	}
+
 	T pop_front()
 	{
 		if(m_list.empty())
@@ -247,86 +253,141 @@ template<typename T>
 class MutexedQueue
 {
 public:
+	template<typename Key, typename U, typename Caller, typename CallerData>
+	friend class RequestQueue;
+
 	MutexedQueue()
 	{
 	}
 	bool empty()
 	{
 		JMutexAutoLock lock(m_mutex);
-		return m_list.empty();
+		return (m_size.GetValue() == 0);
 	}
 	void push_back(T t)
 	{
 		JMutexAutoLock lock(m_mutex);
 		m_list.push_back(t);
+		m_size.Post();
 	}
-	T pop_front(u32 wait_time_max_ms=0)
+
+	/* this version of pop_front returns a empty element of T on timeout.
+	 * Make sure default constructor of T creates a recognizable "empty" element
+	 */
+	T pop_frontNoEx(u32 wait_time_max_ms)
 	{
-		u32 wait_time_ms = 0;
+		if (m_size.Wait(wait_time_max_ms))
+		{
+			JMutexAutoLock lock(m_mutex);
 
-		for(;;)
+			typename std::list<T>::iterator begin = m_list.begin();
+			T t = *begin;
+			m_list.erase(begin);
+			return t;
+		}
+		else
 		{
-			{
-				JMutexAutoLock lock(m_mutex);
-
-				if(!m_list.empty())
-				{
-					typename std::list<T>::iterator begin = m_list.begin();
-					T t = *begin;
-					m_list.erase(begin);
-					return t;
-				}
-
-				if(wait_time_ms >= wait_time_max_ms)
-					throw ItemNotFoundException("MutexedQueue: queue is empty");
-			}
-
-			// Wait a while before trying again
-			sleep_ms(10);
-			wait_time_ms += 10;
+			return T();
 		}
 	}
+
+	T pop_front(u32 wait_time_max_ms)
+	{
+		if (m_size.Wait(wait_time_max_ms))
+		{
+			JMutexAutoLock lock(m_mutex);
+
+			typename std::list<T>::iterator begin = m_list.begin();
+			T t = *begin;
+			m_list.erase(begin);
+			return t;
+		}
+		else
+		{
+			throw ItemNotFoundException("MutexedQueue: queue is empty");
+		}
+	}
+
+	T pop_frontNoEx()
+	{
+		m_size.Wait();
+
+		JMutexAutoLock lock(m_mutex);
+
+		typename std::list<T>::iterator begin = m_list.begin();
+		T t = *begin;
+		m_list.erase(begin);
+		return t;
+	}
+
 	T pop_back(u32 wait_time_max_ms=0)
 	{
-		u32 wait_time_ms = 0;
+		if (m_size.Wait(wait_time_max_ms))
+		{
+			JMutexAutoLock lock(m_mutex);
+
+			typename std::list<T>::iterator last = m_list.end();
+			last--;
+			T t = *last;
+			m_list.erase(last);
+			return t;
+		}
+		else
+		{
+			throw ItemNotFoundException("MutexedQueue: queue is empty");
+		}
+	}
+
+	/* this version of pop_back returns a empty element of T on timeout.
+	 * Make sure default constructor of T creates a recognizable "empty" element
+	 */
+	T pop_backNoEx(u32 wait_time_max_ms=0)
+	{
+		if (m_size.Wait(wait_time_max_ms))
+		{
+			JMutexAutoLock lock(m_mutex);
 
-		for(;;)
+			typename std::list<T>::iterator last = m_list.end();
+			last--;
+			T t = *last;
+			m_list.erase(last);
+			return t;
+		}
+		else
 		{
-			{
-				JMutexAutoLock lock(m_mutex);
-
-				if(!m_list.empty())
-				{
-					typename std::list<T>::iterator last = m_list.end();
-					last--;
-					T t = *last;
-					m_list.erase(last);
-					return t;
-				}
-
-				if(wait_time_ms >= wait_time_max_ms)
-					throw ItemNotFoundException("MutexedQueue: queue is empty");
-			}
-
-			// Wait a while before trying again
-			sleep_ms(10);
-			wait_time_ms += 10;
+			return T();
 		}
 	}
 
+	T pop_backNoEx()
+	{
+		m_size.Wait();
+
+		JMutexAutoLock lock(m_mutex);
+
+		typename std::list<T>::iterator last = m_list.end();
+		last--;
+		T t = *last;
+		m_list.erase(last);
+		return t;
+	}
+
+protected:
 	JMutex & getMutex()
 	{
 		return m_mutex;
 	}
 
+	// NEVER EVER modify the >>list<< you got by using this function!
+	// You may only modify it's content
 	std::list<T> & getList()
 	{
 		return m_list;
 	}
 
-protected:
 	JMutex m_mutex;
 	std::list<T> m_list;
+	JSemaphore m_size;
 };
 
 #endif
diff --git a/src/util/thread.h b/src/util/thread.h
index bb8e03317..8b3c33621 100644
--- a/src/util/thread.h
+++ b/src/util/thread.h
@@ -24,6 +24,7 @@ with this program; if not, write to the Free Software Foundation, Inc.,
 #include "../jthread/jthread.h"
 #include "../jthread/jmutex.h"
 #include "../jthread/jmutexautolock.h"
+#include "porting.h"
 
 template<typename T>
 class MutexedVariable
@@ -123,36 +124,38 @@ class RequestQueue
 	void add(Key key, Caller caller, CallerData callerdata,
 			ResultQueue<Key, T, Caller, CallerData> *dest)
 	{
-		JMutexAutoLock lock(m_queue.getMutex());
-		
-		/*
-			If the caller is already on the list, only update CallerData
-		*/
-		for(typename std::list< GetRequest<Key, T, Caller, CallerData> >::iterator
-				i = m_queue.getList().begin();
-				i != m_queue.getList().end(); ++i)
 		{
-			GetRequest<Key, T, Caller, CallerData> &request = *i;
-
-			if(request.key == key)
+			JMutexAutoLock lock(m_queue.getMutex());
+
+			/*
+				If the caller is already on the list, only update CallerData
+			*/
+			for(typename std::list< GetRequest<Key, T, Caller, CallerData> >::iterator
+					i = m_queue.getList().begin();
+					i != m_queue.getList().end(); ++i)
 			{
-				for(typename std::list< CallerInfo<Caller, CallerData, Key, T> >::iterator
-						i = request.callers.begin();
-						i != request.callers.end(); ++i)
+				GetRequest<Key, T, Caller, CallerData> &request = *i;
+
+				if(request.key == key)
 				{
-					CallerInfo<Caller, CallerData, Key, T> &ca = *i;
-					if(ca.caller == caller)
+					for(typename std::list< CallerInfo<Caller, CallerData, Key, T> >::iterator
+							i = request.callers.begin();
+							i != request.callers.end(); ++i)
 					{
-						ca.data = callerdata;
-						return;
+						CallerInfo<Caller, CallerData, Key, T> &ca = *i;
+						if(ca.caller == caller)
+						{
+							ca.data = callerdata;
+							return;
+						}
 					}
+					CallerInfo<Caller, CallerData, Key, T> ca;
+					ca.caller = caller;
+					ca.data = callerdata;
+					ca.dest = dest;
+					request.callers.push_back(ca);
+					return;
 				}
-				CallerInfo<Caller, CallerData, Key, T> ca;
-				ca.caller = caller;
-				ca.data = callerdata;
-				ca.dest = dest;
-				request.callers.push_back(ca);
-				return;
 			}
 		}
 
@@ -168,12 +171,17 @@ class RequestQueue
 		ca.dest = dest;
 		request.callers.push_back(ca);
 		
-		m_queue.getList().push_back(request);
+		m_queue.push_back(request);
+	}
+
+	GetRequest<Key, T, Caller, CallerData> pop(unsigned int timeout_ms)
+	{
+		return m_queue.pop_front(timeout_ms);
 	}
 
-	GetRequest<Key, T, Caller, CallerData> pop(bool wait_if_empty=false)
+	GetRequest<Key, T, Caller, CallerData> pop()
 	{
-		return m_queue.pop_front(wait_if_empty);
+		return m_queue.pop_frontNoEx();
 	}
 
 	void pushResult(GetRequest<Key, T, Caller, CallerData> req,
-- 
GitLab