00001
00002
00003 #include "cppscript"
00004 #include "dynamic/extensions.hpp"
00005 #include "thread.hpp"
00006
00007
00008 namespace
00009 {
00010 namespace Queue
00011 {
00012 var pop(var queue)
00013 {
00014 {
00015 internal::leave_apartment l;
00016 queue["Queue::num_waiting"].as<api::stack>().wait_pop();
00017 }
00018 return queue["Queue::pending"].pop_front();
00019 }
00020
00021 void post(var queue, var message)
00022 {
00023 queue["Queue::pending"].push_back(message);
00024 queue["Queue::num_waiting"].as<api::stack>().push();
00025 }
00026
00027 void wait(var queue)
00028 {
00029 var num_waiting = queue["Queue::num_waiting"];
00030 internal::leave_apartment l;
00031 num_waiting.as<api::stack>().wait_empty();
00032 }
00033 }
00034
00035
00036 namespace MessageQueue
00037 {
00038 void post(var queue, var message)
00039 {
00040 Queue::post(queue, message);
00041 queue["MessageQueue::num_done"].as<api::stack>().push();
00042 }
00043
00044 void process(var queue)
00045 {
00046 var item = queue["pop"]();
00047
00048 try
00049 {
00050 queue["consumer"](item);
00051 }
00052 catch(...)
00053 {
00054
00055 }
00056
00057 internal::leave_apartment l;
00058 queue["MessageQueue::num_done"].as<api::stack>().wait_pop();
00059 }
00060
00061 void process_thread(var queue)
00062 {
00063 while( !queue["Queue::num_waiting"].as<api::stack>().check_shutdown() )
00064 {
00065 process(queue);
00066 }
00067 }
00068
00069 void close(var queue)
00070 {
00071 queue["Queue::num_waiting"].as<api::stack>().shutdown();
00072 queue["MessageQueue::num_done"].as<api::stack>().shutdown();
00073
00074 foreach(thread, queue["threads"])
00075 thread["join"]();
00076 }
00077
00078 void wait(var queue)
00079 {
00080 Queue::wait(queue);
00081 var num_done = queue["MessageQueue::num_done"];
00082 internal::leave_apartment l;
00083 num_done.as<api::stack>().wait_empty();
00084 }
00085 }
00086 }
00087
00088
00089 var dynamic::queue()
00090 {
00091 return object("Queue").extend
00092 ("Queue::pending", list())
00093 ("post", Queue::post)
00094 ("Queue::num_waiting", create<api::stack>())
00095 ("pop", Queue::pop)
00096 ("wait", Queue::wait);
00097 }
00098
00099
00100 var dynamic::message_queue(const var & consumer, const var & num_threads)
00101 {
00102 var q = queue().extend
00103 ("process_thread", MessageQueue::process_thread)
00104 ("consumer", consumer)
00105 ("process", MessageQueue::process)
00106 ("threads", array())
00107 ("close", MessageQueue::close)
00108 ("post", MessageQueue::post)
00109 ("MessageQueue::num_done", create<api::stack>())
00110 ("wait", MessageQueue::wait);
00111
00112 for(var i = 0; i<num_threads; ++i)
00113 {
00114 q["threads"].push_back(thread(q["process_thread"]));
00115 }
00116
00117 return q;
00118 }