00001 /* Copyright (C) 2004-2006 MySQL AB 00002 00003 This program is free software; you can redistribute it and/or modify 00004 it under the terms of the GNU General Public License as published by 00005 the Free Software Foundation; either version 2 of the License, or 00006 (at your option) any later version. 00007 00008 This program is distributed in the hope that it will be useful, 00009 but WITHOUT ANY WARRANTY; without even the implied warranty of 00010 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00011 GNU General Public License for more details. 00012 00013 You should have received a copy of the GNU General Public License 00014 along with this program; if not, write to the Free Software 00015 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ 00016 00017 #include "mysql_priv.h" 00018 #include "events_priv.h" 00019 #include "events.h" 00020 #include "event_timed.h" 00021 #include "event_scheduler.h" 00022 #include "sp_head.h" 00023 00024 /* 00025 ToDo: 00026 1. Talk to Alik to get a check for configure.in for my_time_t and time_t 00027 2. Look at guardian.h|cc to see its life cycle, has similarities. 00028 */ 00029 00030 00031 /* 00032 The scheduler is implemented as class Event_scheduler. Only one instance is 00033 kept during the runtime of the server, by implementing the Singleton DP. 00034 Object instance is always there because the memory is allocated statically 00035 and initialized when the OS loader loads mysqld. This initialization is 00036 bare. Extended initialization is done during the call to 00037 Event_scheduler::init() in Events::init(). The reason for that late initialization 00038 is that some subsystems needed to boot the Scheduler are not available at 00039 earlier stages of the mysqld boot procedure. Events::init() is called in 00040 mysqld.cc . If the mysqld is started with --event-scheduler=0 then 00041 no initialization takes place and the scheduler is unavailable during this 00042 server run. The server should be started with --event-scheduler=1 to have 00043 the scheduler initialized and able to execute jobs. This starting alwa 00044 s implies that the jobs execution will start immediately. If the server 00045 is started with --event-scheduler=2 then the scheduler is started in suspended 00046 state. Default state, if --event-scheduler is not specified is 2. 00047 00048 The scheduler only manages execution of the events. Their creation, 00049 alteration and deletion is delegated to other routines found in event.cc . 00050 These routines interact with the scheduler : 00051 - CREATE EVENT -> Event_scheduler::create_event() 00052 - ALTER EVENT -> Event_scheduler::update_event() 00053 - DROP EVENT -> Event_scheduler::drop_event() 00054 00055 There is one mutex in the single Event_scheduler object which controls 00056 the simultaneous access to the objects invariants. Using one lock makes 00057 it easy to follow the workflow. This mutex is LOCK_scheduler_data. It is 00058 initialized in Event_scheduler::init(). Which in turn is called by the 00059 Facade class Events in event.cc, coming from init_thread_environment() from 00060 mysqld.cc -> no concurrency at this point. It's destroyed in 00061 Events::destroy_mutexes() called from clean_up_mutexes() in mysqld.cc . 00062 00063 The full initialization is done in Event_scheduler::init() called from 00064 Events::init(). It's done before any requests coming in, so this is a 00065 guarantee for not having concurrency. 00066 00067 The scheduler is started with Event_scheduler::start() and stopped with 00068 Event_scheduler::stop(). When the scheduler starts it loads all events 00069 from mysql.event table. Unfortunately, there is a race condition between 00070 the event disk management functions and the scheduler ones 00071 (add/replace/drop_event & load_events_from_db()), because the operations 00072 do not happen under one global lock but the disk operations are guarded 00073 by the MYISAM lock on mysql.event. In the same time, the queue operations 00074 are guarded by LOCK_scheduler_data. If the scheduler is start()-ed during 00075 server startup and stopped()-ed during server shutdown (in Events::shutdown() 00076 called by kill_server() in mysqld.cc) these races does not exist. 00077 00078 Since the user may want to temporarily inhibit execution of events the 00079 scheduler can be suspended and then it can be forced to resume its 00080 operations. The API call to perform these is 00081 Event_scheduler::suspend_or_resume(enum enum_suspend_or_resume) . 00082 When the scheduler is suspended the main scheduler thread, which ATM 00083 happens to have thread_id 1, locks on a condition COND_suspend_or_resume. 00084 When this is signal is sent for the reverse operation the main scheduler 00085 loops continues to roll and execute events. 00086 00087 When the scheduler is suspended all add/replace/drop_event() operations 00088 work as expected and the modify the queue but no events execution takes 00089 place. 00090 00091 In contrast to the previous scheduler implementation, found in 00092 event_executor.cc, the start, shutdown, suspend and resume are synchronous 00093 operations. As a whole all operations are synchronized and no busy waits 00094 are used except in stop_all_running_events(), which waits until all 00095 running event worker threads have finished. It would have been nice to 00096 use a conditional on which this method will wait and the last thread to 00097 finish would signal it but this implies subclassing THD. 00098 00099 The scheduler does not keep a counter of how many event worker threads are 00100 running, at any specific moment, because this will copy functionality 00101 already existing in the server. Namely, all THDs are registered in the 00102 global `threads` array. THD has member variable system_thread which 00103 identifies the type of thread. Connection threads being NON_SYSTEM_THREAD, 00104 all other have their enum value. Important for the scheduler are 00105 SYSTEM_THREAD_EVENT_SCHEDULER and SYSTEM_THREAD_EVENT_WORKER. 00106 00107 Class THD subclasses class ilink, which is the linked list of all threads. 00108 When a THD instance is destroyed it's being removed from threads, thus 00109 no manual intervention is needed. On the contrary registering is manual 00110 with threads.append() . Traversing the threads array every time a subclass 00111 of THD, for instance if we would have had THD_scheduler_worker to see 00112 how many events we have and whether the scheduler is shutting down will 00113 take much time and lead to a deadlock. stop_all_running_events() is called 00114 under LOCK_scheduler_data. If the THD_scheduler_worker was aware of 00115 the single Event_scheduler instance it will try to check 00116 Event_scheduler::state but for this it would need to acquire 00117 LOCK_scheduler_data => deadlock. Thus stop_all_running_events() uses a 00118 busy wait. 00119 00120 DROP DATABASE DDL should drop all events defined in a specific schema. 00121 DROP USER also should drop all events who has as definer the user being 00122 dropped (this one is not addressed at the moment but a hook exists). For 00123 this specific needs Event_scheduler::drop_matching_events() is 00124 implemented. Which expects a callback to be applied on every object in 00125 the queue. Thus events that match specific schema or user, will be 00126 removed from the queue. The exposed interface is : 00127 - Event_scheduler::drop_schema_events() 00128 - Event_scheduler::drop_user_events() 00129 00130 This bulk dropping happens under LOCK_scheduler_data, thus no two or 00131 more threads can execute it in parallel. However, DROP DATABASE is also 00132 synchronized, currently, in the server thus this does not impact the 00133 overall performance. In addition, DROP DATABASE is not that often 00134 executed DDL. 00135 00136 Though the interface to the scheduler is only through the public methods 00137 of class Event_scheduler, there are currently few functions which are 00138 used during its operations. Namely : 00139 - static evex_print_warnings() 00140 After every event execution all errors/warnings are dumped, so the user 00141 can see in case of a problem what the problem was. 00142 00143 - static init_event_thread() 00144 This function is both used by event_scheduler_thread() and 00145 event_worker_thread(). It initializes the THD structure. The 00146 initialization looks pretty similar to the one in slave.cc done for the 00147 replication threads. However, though the similarities it cannot be 00148 factored out to have one routine. 00149 00150 - static event_scheduler_thread() 00151 Because our way to register functions to be used by the threading library 00152 does not allow usage of static methods this function is used to start the 00153 scheduler in it. It does THD initialization and then calls 00154 Event_scheduler::run(). 00155 00156 - static event_worker_thread() 00157 With already stated the reason for not being able to use methods, this 00158 function executes the worker threads. 00159 00160 The execution of events is, to some extent, synchronized to inhibit race 00161 conditions when Event_timed::thread_id is being updated with the thread_id of 00162 the THD in which the event is being executed. The thread_id is in the 00163 Event_timed object because we need to be able to kill quickly a specific 00164 event during ALTER/DROP EVENT without traversing the global `threads` array. 00165 However, this makes the scheduler's code more complicated. The event worker 00166 thread is started by Event_timed::spawn_now(), which in turn calls 00167 pthread_create(). The thread_id which will be associated in init_event_thread 00168 is not known in advance thus the registering takes place in 00169 event_worker_thread(). This registering has to be synchronized under 00170 LOCK_scheduler_data, so no kill_event() on a object in 00171 replace_event/drop_event/drop_matching_events() could take place. 00172 00173 This synchronization is done through class Worker_thread_param that is 00174 local to this file. Event_scheduler::execute_top() is called under 00175 LOCK_scheduler_data. This method : 00176 1. Creates an instance of Worker_thread_param on the stack 00177 2. Locks Worker_thread_param::LOCK_started 00178 3. Calls Event_timed::spawn_now() which in turn creates a new thread. 00179 4. Locks on Worker_thread_param::COND_started_or_stopped and waits till the 00180 worker thread send signal. The code is spurious wake-up safe because 00181 Worker_thread_param::started is checked. 00182 5. The worker thread initializes its THD, then sets Event_timed::thread_id, 00183 sets Worker_thread_param::started to TRUE and sends back 00184 Worker_thread_param::COND_started. From this moment on, the event 00185 is being executed and could be killed by using Event_timed::thread_id. 00186 When Event_timed::spawn_thread_finish() is called in the worker thread, 00187 it sets thread_id to 0. From this moment on, the worker thread should not 00188 touch the Event_timed instance. 00189 00190 00191 The life-cycle of the server is a FSA. 00192 enum enum_state Event_scheduler::state keeps the state of the scheduler. 00193 00194 The states are: 00195 00196 |---UNINITIALIZED 00197 | 00198 | |------------------> IN_SHUTDOWN 00199 --> INITIALIZED -> COMMENCING ---> RUNNING ----------| 00200 ^ ^ | | ^ | 00201 | |- CANTSTART <--| | |- SUSPENDED <-| 00202 |______________________________| 00203 00204 - UNINITIALIZED :The object is created and only the mutex is initialized 00205 - INITIALIZED :All member variables are initialized 00206 - COMMENCING :The scheduler is starting, no other attempt to start 00207 should succeed before the state is back to INITIALIZED. 00208 - CANTSTART :Set by the ::run() method in case it can't start for some 00209 reason. In this case the connection thread that tries to 00210 start the scheduler sees that some error has occurred and 00211 returns an error to the user. Finally, the connection 00212 thread sets the state to INITIALIZED, so further attempts 00213 to start the scheduler could be made. 00214 - RUNNING :The scheduler is running. New events could be added, 00215 dropped, altered. The scheduler could be stopped. 00216 - SUSPENDED :Like RUNNING but execution of events does not take place. 00217 Operations on the memory queue are possible. 00218 - IN_SHUTDOWN :The scheduler is shutting down, due to request by setting 00219 the global event_scheduler to 0/FALSE, or because of a 00220 KILL command sent by a user to the master thread. 00221 00222 In every method the macros LOCK_SCHEDULER_DATA() and UNLOCK_SCHEDULER_DATA() 00223 are used for (un)locking purposes. They are used to save the programmer 00224 from typing everytime 00225 lock_data(__FUNCTION__, __LINE__); 00226 All locking goes through Event_scheduler::lock_data() and ::unlock_data(). 00227 These two functions then record in variables where for last time 00228 LOCK_scheduler_data was locked and unlocked (two different variables). In 00229 multithreaded environment, in some cases they make no sense but are useful for 00230 inspecting deadlocks without having the server debug log turned on and the 00231 server is still running. 00232 00233 The same strategy is used for conditional variables. 00234 Event_scheduler::cond_wait() is invoked from all places with parameter 00235 an enum enum_cond_vars. In this manner, it's possible to inspect the last 00236 on which condition the last call to cond_wait() was waiting. If the server 00237 was started with debug trace switched on, the trace file also holds information 00238 about conditional variables used. 00239 */ 00240 00241 #ifdef __GNUC__ 00242 #if __GNUC__ >= 2 00243 #define SCHED_FUNC __FUNCTION__ 00244 #endif 00245 #else 00246 #define SCHED_FUNC "<unknown>" 00247 #endif 00248 00249 #define LOCK_SCHEDULER_DATA() lock_data(SCHED_FUNC, __LINE__) 00250 #define UNLOCK_SCHEDULER_DATA() unlock_data(SCHED_FUNC, __LINE__) 00251 00252 00253 #ifndef DBUG_OFF 00254 static 00255 LEX_STRING states_names[] = 00256 { 00257 {(char*) STRING_WITH_LEN("UNINITIALIZED")}, 00258 {(char*) STRING_WITH_LEN("INITIALIZED")}, 00259 {(char*) STRING_WITH_LEN("COMMENCING")}, 00260 {(char*) STRING_WITH_LEN("CANTSTART")}, 00261 {(char*) STRING_WITH_LEN("RUNNING")}, 00262 {(char*) STRING_WITH_LEN("SUSPENDED")}, 00263 {(char*) STRING_WITH_LEN("IN_SHUTDOWN")} 00264 }; 00265 #endif 00266 00267 00268 Event_scheduler 00269 Event_scheduler::singleton; 00270 00271 00272 const char * const 00273 Event_scheduler::cond_vars_names[Event_scheduler::COND_LAST] = 00274 { 00275 "new work", 00276 "started or stopped", 00277 "suspend or resume" 00278 }; 00279 00280 00281 class Worker_thread_param 00282 { 00283 public: 00284 Event_timed *et; 00285 pthread_mutex_t LOCK_started; 00286 pthread_cond_t COND_started; 00287 bool started; 00288 00289 Worker_thread_param(Event_timed *etn):et(etn), started(FALSE) 00290 { 00291 pthread_mutex_init(&LOCK_started, MY_MUTEX_INIT_FAST); 00292 pthread_cond_init(&COND_started, NULL); 00293 } 00294 00295 ~Worker_thread_param() 00296 { 00297 pthread_mutex_destroy(&LOCK_started); 00298 pthread_cond_destroy(&COND_started); 00299 } 00300 }; 00301 00302 00303 /* 00304 Compares the execute_at members of 2 Event_timed instances. 00305 Used as callback for the prioritized queue when shifting 00306 elements inside. 00307 00308 SYNOPSIS 00309 event_timed_compare_q() 00310 00311 vptr - not used (set it to NULL) 00312 a - first Event_timed object 00313 b - second Event_timed object 00314 00315 RETURN VALUE 00316 -1 - a->execute_at < b->execute_at 00317 0 - a->execute_at == b->execute_at 00318 1 - a->execute_at > b->execute_at 00319 00320 NOTES 00321 execute_at.second_part is not considered during comparison 00322 */ 00323 00324 static int 00325 event_timed_compare_q(void *vptr, byte* a, byte *b) 00326 { 00327 return my_time_compare(&((Event_timed *)a)->execute_at, 00328 &((Event_timed *)b)->execute_at); 00329 } 00330 00331 00332 /* 00333 Prints the stack of infos, warnings, errors from thd to 00334 the console so it can be fetched by the logs-into-tables and 00335 checked later. 00336 00337 SYNOPSIS 00338 evex_print_warnings 00339 thd - thread used during the execution of the event 00340 et - the event itself 00341 */ 00342 00343 static void 00344 evex_print_warnings(THD *thd, Event_timed *et) 00345 { 00346 MYSQL_ERROR *err; 00347 DBUG_ENTER("evex_print_warnings"); 00348 if (!thd->warn_list.elements) 00349 DBUG_VOID_RETURN; 00350 00351 char msg_buf[10 * STRING_BUFFER_USUAL_SIZE]; 00352 char prefix_buf[5 * STRING_BUFFER_USUAL_SIZE]; 00353 String prefix(prefix_buf, sizeof(prefix_buf), system_charset_info); 00354 prefix.length(0); 00355 prefix.append("SCHEDULER: ["); 00356 00357 append_identifier(thd, &prefix, et->definer_user.str, et->definer_user.length); 00358 prefix.append('@'); 00359 append_identifier(thd, &prefix, et->definer_host.str, et->definer_host.length); 00360 prefix.append("][", 2); 00361 append_identifier(thd,&prefix, et->dbname.str, et->dbname.length); 00362 prefix.append('.'); 00363 append_identifier(thd,&prefix, et->name.str, et->name.length); 00364 prefix.append("] ", 2); 00365 00366 List_iterator_fast<MYSQL_ERROR> it(thd->warn_list); 00367 while ((err= it++)) 00368 { 00369 String err_msg(msg_buf, sizeof(msg_buf), system_charset_info); 00370 /* set it to 0 or we start adding at the end. That's the trick ;) */ 00371 err_msg.length(0); 00372 err_msg.append(prefix); 00373 err_msg.append(err->msg, strlen(err->msg), system_charset_info); 00374 err_msg.append("]"); 00375 DBUG_ASSERT(err->level < 3); 00376 (sql_print_message_handlers[err->level])("%*s", err_msg.length(), 00377 err_msg.c_ptr()); 00378 } 00379 DBUG_VOID_RETURN; 00380 } 00381 00382 00383 /* 00384 Inits an scheduler thread handler, both the main and a worker 00385 00386 SYNOPSIS 00387 init_event_thread() 00388 thd - the THD of the thread. Has to be allocated by the caller. 00389 00390 NOTES 00391 1. The host of the thead is my_localhost 00392 2. thd->net is initted with NULL - no communication. 00393 00394 RETURN VALUE 00395 0 OK 00396 -1 Error 00397 */ 00398 00399 static int 00400 init_event_thread(THD** t, enum enum_thread_type thread_type) 00401 { 00402 THD *thd= *t; 00403 thd->thread_stack= (char*)t; // remember where our stack is 00404 DBUG_ENTER("init_event_thread"); 00405 thd->client_capabilities= 0; 00406 thd->security_ctx->master_access= 0; 00407 thd->security_ctx->db_access= 0; 00408 thd->security_ctx->host_or_ip= (char*)my_localhost; 00409 my_net_init(&thd->net, 0); 00410 thd->net.read_timeout= slave_net_timeout; 00411 thd->slave_thread= 0; 00412 thd->options|= OPTION_AUTO_IS_NULL; 00413 thd->client_capabilities|= CLIENT_MULTI_RESULTS; 00414 thd->real_id=pthread_self(); 00415 VOID(pthread_mutex_lock(&LOCK_thread_count)); 00416 thd->thread_id= thread_id++; 00417 threads.append(thd); 00418 thread_count++; 00419 thread_running++; 00420 VOID(pthread_mutex_unlock(&LOCK_thread_count)); 00421 00422 if (init_thr_lock() || thd->store_globals()) 00423 { 00424 thd->cleanup(); 00425 DBUG_RETURN(-1); 00426 } 00427 00428 #if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__) 00429 sigset_t set; 00430 VOID(sigemptyset(&set)); // Get mask in use 00431 VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals)); 00432 #endif 00433 00434 /* 00435 Guarantees that we will see the thread in SHOW PROCESSLIST though its 00436 vio is NULL. 00437 */ 00438 thd->system_thread= thread_type; 00439 00440 thd->proc_info= "Initialized"; 00441 thd->version= refresh_version; 00442 thd->set_time(); 00443 00444 DBUG_RETURN(0); 00445 } 00446 00447 00448 /* 00449 Inits the main scheduler thread and then calls Event_scheduler::run() 00450 of arg. 00451 00452 SYNOPSIS 00453 event_scheduler_thread() 00454 arg void* ptr to Event_scheduler 00455 00456 NOTES 00457 1. The host of the thead is my_localhost 00458 2. thd->net is initted with NULL - no communication. 00459 3. The reason to have a proxy function is that it's not possible to 00460 use a method as function to be executed in a spawned thread: 00461 - our pthread_hander_t macro uses extern "C" 00462 - separating thread setup from the real execution loop is also to be 00463 considered good. 00464 00465 RETURN VALUE 00466 0 OK 00467 */ 00468 00469 pthread_handler_t 00470 event_scheduler_thread(void *arg) 00471 { 00472 /* needs to be first for thread_stack */ 00473 THD *thd= NULL; 00474 Event_scheduler *scheduler= (Event_scheduler *) arg; 00475 00476 DBUG_ENTER("event_scheduler_thread"); 00477 00478 my_thread_init(); 00479 pthread_detach_this_thread(); 00480 00481 /* note that constructor of THD uses DBUG_ ! */ 00482 if (!(thd= new THD) || init_event_thread(&thd, SYSTEM_THREAD_EVENT_SCHEDULER)) 00483 { 00484 sql_print_error("SCHEDULER: Cannot init manager event thread."); 00485 scheduler->report_error_during_start(); 00486 } 00487 else 00488 { 00489 thd->security_ctx->set_user((char*)"event_scheduler"); 00490 00491 sql_print_information("SCHEDULER: Manager thread booting"); 00492 if (Event_scheduler::check_system_tables(thd)) 00493 scheduler->report_error_during_start(); 00494 else 00495 scheduler->run(thd); 00496 00497 /* 00498 NOTE: Don't touch `scheduler` after this point because we have notified 00499 the 00500 thread which shuts us down that we have finished cleaning. In this 00501 very moment a new scheduler thread could be started and a crash is 00502 not welcome. 00503 */ 00504 } 00505 00506 /* 00507 If we cannot create THD then don't decrease because we haven't touched 00508 thread_count and thread_running in init_event_thread() which was never 00509 called. In init_event_thread() thread_count and thread_running are 00510 always increased even in the case the method returns an error. 00511 */ 00512 if (thd) 00513 { 00514 thd->proc_info= "Clearing"; 00515 DBUG_ASSERT(thd->net.buff != 0); 00516 net_end(&thd->net); 00517 pthread_mutex_lock(&LOCK_thread_count); 00518 thread_count--; 00519 thread_running--; 00520 delete thd; 00521 pthread_mutex_unlock(&LOCK_thread_count); 00522 } 00523 my_thread_end(); 00524 DBUG_RETURN(0); // Can't return anything here 00525 } 00526 00527 00528 /* 00529 Function that executes an event in a child thread. Setups the 00530 environment for the event execution and cleans after that. 00531 00532 SYNOPSIS 00533 event_worker_thread() 00534 arg The Event_timed object to be processed 00535 00536 RETURN VALUE 00537 0 OK 00538 */ 00539 00540 pthread_handler_t 00541 event_worker_thread(void *arg) 00542 { 00543 THD *thd; /* needs to be first for thread_stack */ 00544 Worker_thread_param *param= (Worker_thread_param *) arg; 00545 Event_timed *event= param->et; 00546 int ret; 00547 bool startup_error= FALSE; 00548 Security_context *save_ctx; 00549 /* this one is local and not needed after exec */ 00550 Security_context security_ctx; 00551 00552 DBUG_ENTER("event_worker_thread"); 00553 DBUG_PRINT("enter", ("event=[%s.%s]", event->dbname.str, event->name.str)); 00554 00555 my_thread_init(); 00556 pthread_detach_this_thread(); 00557 00558 if (!(thd= new THD) || init_event_thread(&thd, SYSTEM_THREAD_EVENT_WORKER)) 00559 { 00560 sql_print_error("SCHEDULER: Startup failure."); 00561 startup_error= TRUE; 00562 event->spawn_thread_finish(thd); 00563 } 00564 else 00565 event->set_thread_id(thd->thread_id); 00566 00567 DBUG_PRINT("info", ("master_access=%d db_access=%d", 00568 thd->security_ctx->master_access, thd->security_ctx->db_access)); 00569 /* 00570 If we don't change it before we send the signal back, then an intermittent 00571 DROP EVENT will take LOCK_scheduler_data and try to kill this thread, because 00572 event->thread_id is already real. However, because thd->security_ctx->user 00573 is not initialized then a crash occurs in kill_one_thread(). Thus, we have 00574 to change the context before sending the signal. We are under 00575 LOCK_scheduler_data being held by Event_scheduler::run() -> ::execute_top(). 00576 */ 00577 change_security_context(thd, event->definer_user, event->definer_host, 00578 event->dbname, &security_ctx, &save_ctx); 00579 DBUG_PRINT("info", ("master_access=%d db_access=%d", 00580 thd->security_ctx->master_access, thd->security_ctx->db_access)); 00581 00582 /* Signal the scheduler thread that we have started successfully */ 00583 pthread_mutex_lock(¶m->LOCK_started); 00584 param->started= TRUE; 00585 pthread_cond_signal(¶m->COND_started); 00586 pthread_mutex_unlock(¶m->LOCK_started); 00587 00588 if (!startup_error) 00589 { 00590 thd->init_for_queries(); 00591 thd->enable_slow_log= TRUE; 00592 00593 event->set_thread_id(thd->thread_id); 00594 sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu", 00595 event->dbname.str, event->name.str, 00596 event->definer.str, thd->thread_id); 00597 00598 ret= event->execute(thd, thd->mem_root); 00599 evex_print_warnings(thd, event); 00600 sql_print_information("SCHEDULER: [%s.%s of %s] executed. RetCode=%d", 00601 event->dbname.str, event->name.str, 00602 event->definer.str, ret); 00603 if (ret == EVEX_COMPILE_ERROR) 00604 sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s", 00605 event->dbname.str, event->name.str, 00606 event->definer.str); 00607 else if (ret == EVEX_MICROSECOND_UNSUP) 00608 sql_print_information("SCHEDULER: MICROSECOND is not supported"); 00609 00610 DBUG_PRINT("info", ("master_access=%d db_access=%d", 00611 thd->security_ctx->master_access, thd->security_ctx->db_access)); 00612 00613 /* If true is returned, we are expected to free it */ 00614 if (event->spawn_thread_finish(thd)) 00615 { 00616 DBUG_PRINT("info", ("Freeing object pointer")); 00617 delete event; 00618 } 00619 } 00620 00621 if (thd) 00622 { 00623 thd->proc_info= "Clearing"; 00624 DBUG_ASSERT(thd->net.buff != 0); 00625 /* 00626 Free it here because net.vio is NULL for us => THD::~THD will check it 00627 and won't call net_end(&net); See also replication code. 00628 */ 00629 net_end(&thd->net); 00630 DBUG_PRINT("info", ("Worker thread %lu exiting", thd->thread_id)); 00631 VOID(pthread_mutex_lock(&LOCK_thread_count)); 00632 thread_count--; 00633 thread_running--; 00634 delete thd; 00635 VOID(pthread_mutex_unlock(&LOCK_thread_count)); 00636 } 00637 00638 my_thread_end(); 00639 DBUG_RETURN(0); // Can't return anything here 00640 } 00641 00642 00643 /* 00644 Constructor of class Event_scheduler. 00645 00646 SYNOPSIS 00647 Event_scheduler::Event_scheduler() 00648 */ 00649 00650 Event_scheduler::Event_scheduler() 00651 :state(UNINITIALIZED), start_scheduler_suspended(FALSE), 00652 thread_id(0), mutex_last_locked_at_line(0), 00653 mutex_last_unlocked_at_line(0), mutex_last_locked_in_func(""), 00654 mutex_last_unlocked_in_func(""), cond_waiting_on(COND_NONE), 00655 mutex_scheduler_data_locked(FALSE) 00656 { 00657 } 00658 00659 00660 /* 00661 Returns the singleton instance of the class. 00662 00663 SYNOPSIS 00664 Event_scheduler::get_instance() 00665 00666 RETURN VALUE 00667 address 00668 */ 00669 00670 Event_scheduler* 00671 Event_scheduler::get_instance() 00672 { 00673 DBUG_ENTER("Event_scheduler::get_instance"); 00674 DBUG_RETURN(&singleton); 00675 } 00676 00677 00678 /* 00679 The implementation of full-fledged initialization. 00680 00681 SYNOPSIS 00682 Event_scheduler::init() 00683 00684 RETURN VALUE 00685 FALSE OK 00686 TRUE Error 00687 */ 00688 00689 bool 00690 Event_scheduler::init() 00691 { 00692 int i= 0; 00693 bool ret= FALSE; 00694 DBUG_ENTER("Event_scheduler::init"); 00695 DBUG_PRINT("enter", ("this=%p", this)); 00696 00697 LOCK_SCHEDULER_DATA(); 00698 for (;i < COND_LAST; i++) 00699 if (pthread_cond_init(&cond_vars[i], NULL)) 00700 { 00701 sql_print_error("SCHEDULER: Unable to initalize conditions"); 00702 ret= TRUE; 00703 goto end; 00704 } 00705 00706 /* init memory root */ 00707 init_alloc_root(&scheduler_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); 00708 00709 if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/, 00710 event_timed_compare_q, NULL, 30 /*auto_extent*/)) 00711 { 00712 sql_print_error("SCHEDULER: Can't initialize the execution queue"); 00713 ret= TRUE; 00714 goto end; 00715 } 00716 00717 if (sizeof(my_time_t) != sizeof(time_t)) 00718 { 00719 sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ." 00720 "The scheduler may not work correctly. Stopping."); 00721 DBUG_ASSERT(0); 00722 ret= TRUE; 00723 goto end; 00724 } 00725 00726 state= INITIALIZED; 00727 end: 00728 UNLOCK_SCHEDULER_DATA(); 00729 DBUG_RETURN(ret); 00730 } 00731 00732 00733 /* 00734 Frees all memory allocated by the scheduler object. 00735 00736 SYNOPSIS 00737 Event_scheduler::destroy() 00738 00739 RETURN VALUE 00740 FALSE OK 00741 TRUE Error 00742 */ 00743 00744 void 00745 Event_scheduler::destroy() 00746 { 00747 DBUG_ENTER("Event_scheduler"); 00748 00749 LOCK_SCHEDULER_DATA(); 00750 switch (state) { 00751 case UNINITIALIZED: 00752 break; 00753 case INITIALIZED: 00754 delete_queue(&queue); 00755 free_root(&scheduler_root, MYF(0)); 00756 int i; 00757 for (i= 0; i < COND_LAST; i++) 00758 pthread_cond_destroy(&cond_vars[i]); 00759 state= UNINITIALIZED; 00760 break; 00761 default: 00762 sql_print_error("SCHEDULER: Destroying while state is %d", state); 00763 /* I trust my code but ::safe() > ::sorry() */ 00764 DBUG_ASSERT(0); 00765 break; 00766 } 00767 UNLOCK_SCHEDULER_DATA(); 00768 00769 DBUG_VOID_RETURN; 00770 } 00771 00772 00773 /* 00774 Creates an event in the scheduler queue 00775 00776 SYNOPSIS 00777 Event_scheduler::create_event() 00778 et The event to add 00779 check_existence Whether to check if already loaded. 00780 00781 RETURN VALUE 00782 OP_OK OK or scheduler not working 00783 OP_LOAD_ERROR Error during loading from disk 00784 */ 00785 00786 enum Event_scheduler::enum_error_code 00787 Event_scheduler::create_event(THD *thd, Event_timed *et, bool check_existence) 00788 { 00789 enum enum_error_code res; 00790 Event_timed *et_new; 00791 DBUG_ENTER("Event_scheduler::create_event"); 00792 DBUG_PRINT("enter", ("thd=%p et=%p lock=%p",thd,et,&LOCK_scheduler_data)); 00793 00794 LOCK_SCHEDULER_DATA(); 00795 if (!is_running_or_suspended()) 00796 { 00797 DBUG_PRINT("info", ("scheduler not running but %d. doing nothing", state)); 00798 UNLOCK_SCHEDULER_DATA(); 00799 DBUG_RETURN(OP_OK); 00800 } 00801 if (check_existence && find_event(et, FALSE)) 00802 { 00803 res= OP_ALREADY_EXISTS; 00804 goto end; 00805 } 00806 00807 /* We need to load the event on scheduler_root */ 00808 if (!(res= load_named_event(thd, et, &et_new))) 00809 { 00810 queue_insert_safe(&queue, (byte *) et_new); 00811 DBUG_PRINT("info", ("Sending COND_new_work")); 00812 pthread_cond_signal(&cond_vars[COND_new_work]); 00813 } 00814 else if (res == OP_DISABLED_EVENT) 00815 res= OP_OK; 00816 end: 00817 UNLOCK_SCHEDULER_DATA(); 00818 DBUG_RETURN(res); 00819 } 00820 00821 00822 /* 00823 Drops an event from the scheduler queue 00824 00825 SYNOPSIS 00826 Event_scheduler::drop_event() 00827 etn The event to drop 00828 state Wait the event or kill&drop 00829 00830 RETURN VALUE 00831 FALSE OK (replaced or scheduler not working) 00832 TRUE Failure 00833 */ 00834 00835 bool 00836 Event_scheduler::drop_event(THD *thd, Event_timed *et) 00837 { 00838 int res; 00839 Event_timed *et_old; 00840 DBUG_ENTER("Event_scheduler::drop_event"); 00841 DBUG_PRINT("enter", ("thd=%p et=%p lock=%p",thd,et,&LOCK_scheduler_data)); 00842 00843 LOCK_SCHEDULER_DATA(); 00844 if (!is_running_or_suspended()) 00845 { 00846 DBUG_PRINT("info", ("scheduler not running but %d. doing nothing", state)); 00847 UNLOCK_SCHEDULER_DATA(); 00848 DBUG_RETURN(OP_OK); 00849 } 00850 00851 if (!(et_old= find_event(et, TRUE))) 00852 DBUG_PRINT("info", ("No such event found, probably DISABLED")); 00853 00854 UNLOCK_SCHEDULER_DATA(); 00855 00856 /* See comments in ::replace_event() why this is split in two parts. */ 00857 if (et_old) 00858 { 00859 switch ((res= et_old->kill_thread(thd))) { 00860 case EVEX_CANT_KILL: 00861 /* Don't delete but continue */ 00862 et_old->flags |= EVENT_FREE_WHEN_FINISHED; 00863 break; 00864 case 0: 00865 /* 00866 kill_thread() waits till the spawned thread finishes after it's 00867 killed. Hence, we delete here memory which is no more referenced from 00868 a running thread. 00869 */ 00870 delete et_old; 00871 /* 00872 We don't signal COND_new_work here because: 00873 1. Even if the dropped event is on top of the queue this will not 00874 move another one to be executed before the time the one on the 00875 top (but could be at the same second as the dropped one) 00876 2. If this was the last event on the queue, then pthread_cond_timedwait 00877 in ::run() will finish and then see that the queue is empty and 00878 call cond_wait(). Hence, no need to interrupt the blocked 00879 ::run() thread. 00880 */ 00881 break; 00882 default: 00883 sql_print_error("SCHEDULER: Got unexpected error %d", res); 00884 DBUG_ASSERT(0); 00885 } 00886 } 00887 00888 DBUG_RETURN(FALSE); 00889 } 00890 00891 00892 /* 00893 Updates an event from the scheduler queue 00894 00895 SYNOPSIS 00896 Event_scheduler::replace_event() 00897 et The event to replace(add) into the queue 00898 state Async or sync stopping 00899 00900 RETURN VALUE 00901 OP_OK OK or scheduler not working 00902 OP_LOAD_ERROR Error during loading from disk 00903 OP_ALREADY_EXISTS Event already in the queue 00904 */ 00905 00906 enum Event_scheduler::enum_error_code 00907 Event_scheduler::update_event(THD *thd, Event_timed *et, 00908 LEX_STRING *new_schema, 00909 LEX_STRING *new_name) 00910 { 00911 enum enum_error_code res; 00912 Event_timed *et_old, *et_new= NULL; 00913 LEX_STRING old_schema, old_name; 00914 00915 LINT_INIT(old_schema.str); 00916 LINT_INIT(old_schema.length); 00917 LINT_INIT(old_name.str); 00918 LINT_INIT(old_name.length); 00919 00920 DBUG_ENTER("Event_scheduler::update_event"); 00921 DBUG_PRINT("enter", ("thd=%p et=%p et=[%s.%s] lock=%p", 00922 thd, et, et->dbname.str, et->name.str, &LOCK_scheduler_data)); 00923 00924 LOCK_SCHEDULER_DATA(); 00925 if (!is_running_or_suspended()) 00926 { 00927 DBUG_PRINT("info", ("scheduler not running but %d. doing nothing", state)); 00928 UNLOCK_SCHEDULER_DATA(); 00929 DBUG_RETURN(OP_OK); 00930 } 00931 00932 if (!(et_old= find_event(et, TRUE))) 00933 DBUG_PRINT("info", ("%s.%s not found cached, probably was DISABLED", 00934 et->dbname.str, et->name.str)); 00935 00936 if (new_schema && new_name) 00937 { 00938 old_schema= et->dbname; 00939 old_name= et->name; 00940 et->dbname= *new_schema; 00941 et->name= *new_name; 00942 } 00943 /* 00944 We need to load the event (it's strings but on the object itself) 00945 on scheduler_root. et_new could be NULL : 00946 1. Error occured 00947 2. If the replace is DISABLED, we don't load it into the queue. 00948 */ 00949 if (!(res= load_named_event(thd, et, &et_new))) 00950 { 00951 queue_insert_safe(&queue, (byte *) et_new); 00952 DBUG_PRINT("info", ("Sending COND_new_work")); 00953 pthread_cond_signal(&cond_vars[COND_new_work]); 00954 } 00955 else if (res == OP_DISABLED_EVENT) 00956 res= OP_OK; 00957 00958 if (new_schema && new_name) 00959 { 00960 et->dbname= old_schema; 00961 et->name= old_name; 00962 } 00963 00964 UNLOCK_SCHEDULER_DATA(); 00965 /* 00966 Andrey: Is this comment still truthful ??? 00967 00968 We don't move this code above because a potential kill_thread will call 00969 THD::awake(). Which in turn will try to acqure mysys_var->current_mutex, 00970 which is LOCK_scheduler_data on which the COND_new_work in ::run() locks. 00971 Hence, we try to acquire a lock which we have already acquired and we run 00972 into an assert. Holding LOCK_scheduler_data however is not needed because 00973 we don't touch any invariant of the scheduler anymore. ::drop_event() does 00974 the same. 00975 */ 00976 if (et_old) 00977 { 00978 switch (et_old->kill_thread(thd)) { 00979 case EVEX_CANT_KILL: 00980 /* Don't delete but continue */ 00981 et_old->flags |= EVENT_FREE_WHEN_FINISHED; 00982 break; 00983 case 0: 00984 /* 00985 kill_thread() waits till the spawned thread finishes after it's 00986 killed. Hence, we delete here memory which is no more referenced from 00987 a running thread. 00988 */ 00989 delete et_old; 00990 /* 00991 We don't signal COND_new_work here because: 00992 1. Even if the dropped event is on top of the queue this will not 00993 move another one to be executed before the time the one on the 00994 top (but could be at the same second as the dropped one) 00995 2. If this was the last event on the queue, then pthread_cond_timedwait 00996 in ::run() will finish and then see that the queue is empty and 00997 call cond_wait(). Hence, no need to interrupt the blocked 00998 ::run() thread. 00999 */ 01000 break; 01001 default: 01002 DBUG_ASSERT(0); 01003 } 01004 } 01005 01006 DBUG_RETURN(res); 01007 } 01008 01009 01010 /* 01011 Searches for an event in the scheduler queue 01012 01013 SYNOPSIS 01014 Event_scheduler::find_event() 01015 etn The event to find 01016 comparator The function to use for comparing 01017 remove_from_q If found whether to remove from the Q 01018 01019 RETURN VALUE 01020 NULL Not found 01021 otherwise Address 01022 01023 NOTE 01024 The caller should do the locking also the caller is responsible for 01025 actual signalling in case an event is removed from the queue 01026 (signalling COND_new_work for instance). 01027 */ 01028 01029 Event_timed * 01030 Event_scheduler::find_event(Event_timed *etn, bool remove_from_q) 01031 { 01032 uint i; 01033 DBUG_ENTER("Event_scheduler::find_event"); 01034 01035 for (i= 0; i < queue.elements; ++i) 01036 { 01037 Event_timed *et= (Event_timed *) queue_element(&queue, i); 01038 DBUG_PRINT("info", ("[%s.%s]==[%s.%s]?", etn->dbname.str, etn->name.str, 01039 et->dbname.str, et->name.str)); 01040 if (event_timed_identifier_equal(etn, et)) 01041 { 01042 if (remove_from_q) 01043 queue_remove(&queue, i); 01044 DBUG_RETURN(et); 01045 } 01046 } 01047 01048 DBUG_RETURN(NULL); 01049 } 01050 01051 01052 /* 01053 Drops all events from the in-memory queue and disk that match 01054 certain pattern evaluated by a comparator function 01055 01056 SYNOPSIS 01057 Event_scheduler::drop_matching_events() 01058 thd THD 01059 pattern A pattern string 01060 comparator The function to use for comparing 01061 01062 RETURN VALUE 01063 -1 Scheduler not working 01064 >=0 Number of dropped events 01065 01066 NOTE 01067 Expected is the caller to acquire lock on LOCK_scheduler_data 01068 */ 01069 01070 void 01071 Event_scheduler::drop_matching_events(THD *thd, LEX_STRING *pattern, 01072 bool (*comparator)(Event_timed *,LEX_STRING *)) 01073 { 01074 DBUG_ENTER("Event_scheduler::drop_matching_events"); 01075 DBUG_PRINT("enter", ("pattern=%*s state=%d", pattern->length, pattern->str, 01076 state)); 01077 if (is_running_or_suspended()) 01078 { 01079 uint i= 0, dropped= 0; 01080 while (i < queue.elements) 01081 { 01082 Event_timed *et= (Event_timed *) queue_element(&queue, i); 01083 DBUG_PRINT("info", ("[%s.%s]?", et->dbname.str, et->name.str)); 01084 if (comparator(et, pattern)) 01085 { 01086 /* 01087 The queue is ordered. If we remove an element, then all elements after 01088 it will shift one position to the left, if we imagine it as an array 01089 from left to the right. In this case we should not increment the 01090 counter and the (i < queue.elements) condition is ok. 01091 */ 01092 queue_remove(&queue, i); 01093 01094 /* See replace_event() */ 01095 switch (et->kill_thread(thd)) { 01096 case EVEX_CANT_KILL: 01097 /* Don't delete but continue */ 01098 et->flags |= EVENT_FREE_WHEN_FINISHED; 01099 ++dropped; 01100 break; 01101 case 0: 01102 delete et; 01103 ++dropped; 01104 break; 01105 default: 01106 DBUG_ASSERT(0); 01107 } 01108 } 01109 else 01110 i++; 01111 } 01112 DBUG_PRINT("info", ("Dropped %lu", dropped)); 01113 } 01114 /* 01115 Don't send COND_new_work because no need to wake up the scheduler thread. 01116 When it wakes next time up it will recalculate how much more it should 01117 sleep if the top of the queue has been changed by this method. 01118 */ 01119 01120 DBUG_VOID_RETURN; 01121 } 01122 01123 01124 /* 01125 Drops all events from the in-memory queue and disk that are from 01126 certain schema. 01127 01128 SYNOPSIS 01129 Event_scheduler::drop_schema_events() 01130 thd THD 01131 db The schema name 01132 01133 RETURN VALUE 01134 -1 Scheduler not working 01135 >=0 Number of dropped events 01136 */ 01137 01138 int 01139 Event_scheduler::drop_schema_events(THD *thd, LEX_STRING *schema) 01140 { 01141 int ret; 01142 DBUG_ENTER("Event_scheduler::drop_schema_events"); 01143 LOCK_SCHEDULER_DATA(); 01144 if (is_running_or_suspended()) 01145 drop_matching_events(thd, schema, event_timed_db_equal); 01146 01147 ret= db_drop_events_from_table(thd, schema); 01148 UNLOCK_SCHEDULER_DATA(); 01149 01150 DBUG_RETURN(ret); 01151 } 01152 01153 01154 extern pthread_attr_t connection_attrib; 01155 01156 01157 /* 01158 Starts the event scheduler 01159 01160 SYNOPSIS 01161 Event_scheduler::start() 01162 01163 RETURN VALUE 01164 FALSE OK 01165 TRUE Error 01166 */ 01167 01168 bool 01169 Event_scheduler::start() 01170 { 01171 bool ret= FALSE; 01172 pthread_t th; 01173 DBUG_ENTER("Event_scheduler::start"); 01174 01175 LOCK_SCHEDULER_DATA(); 01176 /* If already working or starting don't make another attempt */ 01177 DBUG_ASSERT(state == INITIALIZED); 01178 if (state > INITIALIZED) 01179 { 01180 DBUG_PRINT("info", ("scheduler is already running or starting")); 01181 ret= TRUE; 01182 goto end; 01183 } 01184 01185 /* 01186 Now if another thread calls start it will bail-out because the branch 01187 above will be executed. Thus no two or more child threads will be forked. 01188 If the child thread cannot start for some reason then `state` is set 01189 to CANTSTART and COND_started is also signaled. In this case we 01190 set `state` back to INITIALIZED so another attempt to start the scheduler 01191 can be made. 01192 */ 01193 state= COMMENCING; 01194 /* Fork */ 01195 if (pthread_create(&th, &connection_attrib, event_scheduler_thread, 01196 (void*)this)) 01197 { 01198 DBUG_PRINT("error", ("cannot create a new thread")); 01199 state= INITIALIZED; 01200 ret= TRUE; 01201 goto end; 01202 } 01203 01204 /* Wait till the child thread has booted (w/ or wo success) */ 01205 while (!is_running_or_suspended() && state != CANTSTART) 01206 cond_wait(COND_started_or_stopped, &LOCK_scheduler_data); 01207 01208 /* 01209 If we cannot start for some reason then don't prohibit further attempts. 01210 Set back to INITIALIZED. 01211 */ 01212 if (state == CANTSTART) 01213 { 01214 state= INITIALIZED; 01215 ret= TRUE; 01216 goto end; 01217 } 01218 01219 end: 01220 UNLOCK_SCHEDULER_DATA(); 01221 DBUG_RETURN(ret); 01222 } 01223 01224 01225 /* 01226 Starts the event scheduler in suspended mode. 01227 01228 SYNOPSIS 01229 Event_scheduler::start_suspended() 01230 01231 RETURN VALUE 01232 TRUE OK 01233 FALSE Error 01234 */ 01235 01236 bool 01237 Event_scheduler::start_suspended() 01238 { 01239 DBUG_ENTER("Event_scheduler::start_suspended"); 01240 start_scheduler_suspended= TRUE; 01241 DBUG_RETURN(start()); 01242 } 01243 01244 01245 01246 /* 01247 Report back that we cannot start. Used for ocasions where 01248 we can't go into ::run() and have to report externally. 01249 01250 SYNOPSIS 01251 Event_scheduler::report_error_during_start() 01252 */ 01253 01254 inline void 01255 Event_scheduler::report_error_during_start() 01256 { 01257 DBUG_ENTER("Event_scheduler::report_error_during_start"); 01258 01259 LOCK_SCHEDULER_DATA(); 01260 state= CANTSTART; 01261 DBUG_PRINT("info", ("Sending back COND_started_or_stopped")); 01262 pthread_cond_signal(&cond_vars[COND_started_or_stopped]); 01263 UNLOCK_SCHEDULER_DATA(); 01264 01265 DBUG_VOID_RETURN; 01266 } 01267 01268 01269 /* 01270 The internal loop of the event scheduler 01271 01272 SYNOPSIS 01273 Event_scheduler::run() 01274 thd Thread 01275 01276 RETURN VALUE 01277 FALSE OK 01278 TRUE Failure 01279 */ 01280 01281 bool 01282 Event_scheduler::run(THD *thd) 01283 { 01284 int ret; 01285 struct timespec abstime; 01286 DBUG_ENTER("Event_scheduler::run"); 01287 DBUG_PRINT("enter", ("thd=%p", thd)); 01288 01289 LOCK_SCHEDULER_DATA(); 01290 ret= load_events_from_db(thd); 01291 01292 if (!ret) 01293 { 01294 thread_id= thd->thread_id; 01295 state= start_scheduler_suspended? SUSPENDED:RUNNING; 01296 start_scheduler_suspended= FALSE; 01297 } 01298 else 01299 state= CANTSTART; 01300 01301 DBUG_PRINT("info", ("Sending back COND_started_or_stopped")); 01302 pthread_cond_signal(&cond_vars[COND_started_or_stopped]); 01303 if (ret) 01304 { 01305 UNLOCK_SCHEDULER_DATA(); 01306 DBUG_RETURN(TRUE); 01307 } 01308 if (!check_n_suspend_if_needed(thd)) 01309 UNLOCK_SCHEDULER_DATA(); 01310 01311 sql_print_information("SCHEDULER: Manager thread started with id %lu", 01312 thd->thread_id); 01313 abstime.tv_nsec= 0; 01314 while (is_running_or_suspended()) 01315 { 01316 Event_timed *et; 01317 01318 LOCK_SCHEDULER_DATA(); 01319 if (check_n_wait_for_non_empty_queue(thd)) 01320 continue; 01321 01322 /* On TRUE data is unlocked, go back to the beginning */ 01323 if (check_n_suspend_if_needed(thd)) 01324 continue; 01325 01326 /* Guaranteed locked here */ 01327 if (state == IN_SHUTDOWN || shutdown_in_progress) 01328 { 01329 UNLOCK_SCHEDULER_DATA(); 01330 break; 01331 } 01332 DBUG_ASSERT(state == RUNNING); 01333 01334 et= (Event_timed *)queue_top(&queue); 01335 01336 /* Skip disabled events */ 01337 if (et->status != Event_timed::ENABLED) 01338 { 01339 /* 01340 It could be a one-timer scheduled for a time, already in the past when the 01341 scheduler was suspended. 01342 */ 01343 sql_print_information("SCHEDULER: Found a disabled event %*s.%*s in the queue", 01344 et->dbname.length, et->dbname.str, et->name.length, 01345 et->name.str); 01346 queue_remove(&queue, 0); 01347 /* ToDo: check this again */ 01348 if (et->dropped) 01349 et->drop(thd); 01350 delete et; 01351 UNLOCK_SCHEDULER_DATA(); 01352 continue; 01353 } 01354 thd->proc_info= (char *)"Computing"; 01355 DBUG_PRINT("evex manager",("computing time to sleep till next exec")); 01356 /* Timestamp is in UTC */ 01357 abstime.tv_sec= sec_since_epoch_TIME(&et->execute_at); 01358 01359 thd->end_time(); 01360 if (abstime.tv_sec > thd->query_start()) 01361 { 01362 /* Event trigger time is in the future */ 01363 thd->proc_info= (char *)"Sleep"; 01364 DBUG_PRINT("info", ("Going to sleep. Should wakeup after approx %d secs", 01365 abstime.tv_sec - thd->query_start())); 01366 DBUG_PRINT("info", ("Entering condition because waiting for activation")); 01367 /* 01368 Use THD::enter_cond()/exit_cond() or we won't be able to kill a 01369 sleeping thread. Though ::stop() can do it by sending COND_new_work 01370 an user can't by just issuing 'KILL x'; . In the latter case 01371 pthread_cond_timedwait() will wait till `abstime`. 01372 "Sleeping until next time" 01373 */ 01374 thd->enter_cond(&cond_vars[COND_new_work],&LOCK_scheduler_data,"Sleeping"); 01375 01376 pthread_cond_timedwait(&cond_vars[COND_new_work], &LOCK_scheduler_data, 01377 &abstime); 01378 01379 DBUG_PRINT("info", ("Manager woke up. state is %d", state)); 01380 /* 01381 If we get signal we should recalculate the whether it's the right time 01382 because there could be : 01383 1. Spurious wake-up 01384 2. The top of the queue was changed (new one becase of add/drop/replace) 01385 */ 01386 /* This will do implicit UNLOCK_SCHEDULER_DATA() */ 01387 thd->exit_cond(""); 01388 } 01389 else 01390 { 01391 thd->proc_info= (char *)"Executing"; 01392 /* 01393 Execute the event. An error may occur if a thread cannot be forked. 01394 In this case stop the manager. 01395 We should enter ::execute_top() with locked LOCK_scheduler_data. 01396 */ 01397 int ret= execute_top(thd); 01398 UNLOCK_SCHEDULER_DATA(); 01399 if (ret) 01400 break; 01401 } 01402 } 01403 01404 thd->proc_info= (char *)"Cleaning"; 01405 01406 LOCK_SCHEDULER_DATA(); 01407 /* 01408 It's possible that a user has used (SQL)COM_KILL. Hence set the appropriate 01409 state because it is only set by ::stop(). 01410 */ 01411 if (state != IN_SHUTDOWN) 01412 { 01413 DBUG_PRINT("info", ("We got KILL but the but not from ::stop()")); 01414 state= IN_SHUTDOWN; 01415 } 01416 UNLOCK_SCHEDULER_DATA(); 01417 01418 sql_print_information("SCHEDULER: Shutting down"); 01419 01420 thd->proc_info= (char *)"Cleaning queue"; 01421 clean_queue(thd); 01422 THD_CHECK_SENTRY(thd); 01423 01424 /* free mamager_root memory but don't destroy the root */ 01425 thd->proc_info= (char *)"Cleaning memory root"; 01426 free_root(&scheduler_root, MYF(0)); 01427 THD_CHECK_SENTRY(thd); 01428 01429 /* 01430 We notify the waiting thread which shutdowns us that we have cleaned. 01431 There are few more instructions to be executed in this pthread but 01432 they don't affect manager structures thus it's safe to signal already 01433 at this point. 01434 */ 01435 LOCK_SCHEDULER_DATA(); 01436 thd->proc_info= (char *)"Sending shutdown signal"; 01437 DBUG_PRINT("info", ("Sending COND_started_or_stopped")); 01438 if (state == IN_SHUTDOWN) 01439 pthread_cond_signal(&cond_vars[COND_started_or_stopped]); 01440 01441 state= INITIALIZED; 01442 /* 01443 We set it here because ::run() can stop not only because of ::stop() 01444 call but also because of `KILL x` 01445 */ 01446 thread_id= 0; 01447 sql_print_information("SCHEDULER: Stopped"); 01448 UNLOCK_SCHEDULER_DATA(); 01449 01450 /* We have modified, we set back */ 01451 thd->query= NULL; 01452 thd->query_length= 0; 01453 01454 DBUG_RETURN(FALSE); 01455 } 01456 01457 01458 /* 01459 Executes the top element of the queue. Auxiliary method for ::run(). 01460 01461 SYNOPSIS 01462 Event_scheduler::execute_top() 01463 01464 RETURN VALUE 01465 FALSE OK 01466 TRUE Failure 01467 01468 NOTE 01469 NO locking is done. EXPECTED is that the caller should have locked 01470 the queue (w/ LOCK_scheduler_data). 01471 */ 01472 01473 bool 01474 Event_scheduler::execute_top(THD *thd) 01475 { 01476 int spawn_ret_code; 01477 bool ret= FALSE; 01478 DBUG_ENTER("Event_scheduler::execute_top"); 01479 DBUG_PRINT("enter", ("thd=%p", thd)); 01480 01481 Event_timed *et= (Event_timed *)queue_top(&queue); 01482 01483 /* Is it good idea to pass a stack address ?*/ 01484 Worker_thread_param param(et); 01485 01486 pthread_mutex_lock(¶m.LOCK_started); 01487 /* 01488 We don't lock LOCK_scheduler_data fpr workers_increment() because it's a 01489 pre-requisite for calling the current_method. 01490 */ 01491 switch ((spawn_ret_code= et->spawn_now(event_worker_thread, ¶m))) { 01492 case EVENT_EXEC_CANT_FORK: 01493 /* 01494 We don't lock LOCK_scheduler_data here because it's a pre-requisite 01495 for calling the current_method. 01496 */ 01497 sql_print_error("SCHEDULER: Problem while trying to create a thread"); 01498 ret= TRUE; 01499 break; 01500 case EVENT_EXEC_ALREADY_EXEC: 01501 /* 01502 We don't lock LOCK_scheduler_data here because it's a pre-requisite 01503 for calling the current_method. 01504 */ 01505 sql_print_information("SCHEDULER: %s.%s in execution. Skip this time.", 01506 et->dbname.str, et->name.str); 01507 if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == Event_timed::DISABLED) 01508 queue_remove(&queue, 0);// 0 is top, internally 1 01509 else 01510 queue_replaced(&queue); 01511 break; 01512 default: 01513 DBUG_ASSERT(!spawn_ret_code); 01514 if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == Event_timed::DISABLED) 01515 queue_remove(&queue, 0);// 0 is top, internally 1 01516 else 01517 queue_replaced(&queue); 01518 /* 01519 We don't lock LOCK_scheduler_data here because it's a pre-requisite 01520 for calling the current_method. 01521 */ 01522 if (likely(!spawn_ret_code)) 01523 { 01524 /* Wait the forked thread to start */ 01525 do { 01526 pthread_cond_wait(¶m.COND_started, ¶m.LOCK_started); 01527 } while (!param.started); 01528 } 01529 /* 01530 param was allocated on the stack so no explicit delete as well as 01531 in this moment it's no more used in the spawned thread so it's safe 01532 to be deleted. 01533 */ 01534 break; 01535 } 01536 pthread_mutex_unlock(¶m.LOCK_started); 01537 /* `param` is on the stack and will be destructed by the compiler */ 01538 01539 DBUG_RETURN(ret); 01540 } 01541 01542 01543 /* 01544 Cleans the scheduler's queue. Auxiliary method for ::run(). 01545 01546 SYNOPSIS 01547 Event_scheduler::clean_queue() 01548 thd Thread 01549 */ 01550 01551 void 01552 Event_scheduler::clean_queue(THD *thd) 01553 { 01554 CHARSET_INFO *scs= system_charset_info; 01555 uint i; 01556 DBUG_ENTER("Event_scheduler::clean_queue"); 01557 DBUG_PRINT("enter", ("thd=%p", thd)); 01558 01559 LOCK_SCHEDULER_DATA(); 01560 stop_all_running_events(thd); 01561 UNLOCK_SCHEDULER_DATA(); 01562 01563 sql_print_information("SCHEDULER: Emptying the queue"); 01564 01565 /* empty the queue */ 01566 for (i= 0; i < queue.elements; ++i) 01567 { 01568 Event_timed *et= (Event_timed *) queue_element(&queue, i); 01569 et->free_sp(); 01570 delete et; 01571 } 01572 resize_queue(&queue, 0); 01573 01574 DBUG_VOID_RETURN; 01575 } 01576 01577 01578 /* 01579 Stops all running events 01580 01581 SYNOPSIS 01582 Event_scheduler::stop_all_running_events() 01583 thd Thread 01584 01585 NOTE 01586 LOCK_scheduler data must be acquired prior to call to this method 01587 */ 01588 01589 void 01590 Event_scheduler::stop_all_running_events(THD *thd) 01591 { 01592 CHARSET_INFO *scs= system_charset_info; 01593 uint i; 01594 DYNAMIC_ARRAY running_threads; 01595 THD *tmp; 01596 DBUG_ENTER("Event_scheduler::stop_all_running_events"); 01597 DBUG_PRINT("enter", ("workers_count=%d", workers_count())); 01598 01599 my_init_dynamic_array(&running_threads, sizeof(ulong), 10, 10); 01600 01601 bool had_super= FALSE; 01602 VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list 01603 I_List_iterator<THD> it(threads); 01604 while ((tmp=it++)) 01605 { 01606 if (tmp->command == COM_DAEMON) 01607 continue; 01608 if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER) 01609 push_dynamic(&running_threads, (gptr) &tmp->thread_id); 01610 } 01611 VOID(pthread_mutex_unlock(&LOCK_thread_count)); 01612 01613 /* We need temporarily SUPER_ACL to be able to kill our offsprings */ 01614 if (!(thd->security_ctx->master_access & SUPER_ACL)) 01615 thd->security_ctx->master_access|= SUPER_ACL; 01616 else 01617 had_super= TRUE; 01618 01619 char tmp_buff[10*STRING_BUFFER_USUAL_SIZE]; 01620 char int_buff[STRING_BUFFER_USUAL_SIZE]; 01621 String tmp_string(tmp_buff, sizeof(tmp_buff), scs); 01622 String int_string(int_buff, sizeof(int_buff), scs); 01623 tmp_string.length(0); 01624 01625 for (i= 0; i < running_threads.elements; ++i) 01626 { 01627 int ret; 01628 ulong thd_id= *dynamic_element(&running_threads, i, ulong*); 01629 01630 int_string.set((longlong) thd_id,scs); 01631 tmp_string.append(int_string); 01632 if (i < running_threads.elements - 1) 01633 tmp_string.append(' '); 01634 01635 if ((ret= kill_one_thread(thd, thd_id, FALSE))) 01636 { 01637 sql_print_error("SCHEDULER: Error killing %lu code=%d", thd_id, ret); 01638 break; 01639 } 01640 } 01641 if (running_threads.elements) 01642 sql_print_information("SCHEDULER: Killing workers :%s", tmp_string.c_ptr()); 01643 01644 if (!had_super) 01645 thd->security_ctx->master_access &= ~SUPER_ACL; 01646 01647 delete_dynamic(&running_threads); 01648 01649 sql_print_information("SCHEDULER: Waiting for worker threads to finish"); 01650 01651 while (workers_count()) 01652 my_sleep(100000); 01653 01654 DBUG_VOID_RETURN; 01655 } 01656 01657 01658 /* 01659 Stops the event scheduler 01660 01661 SYNOPSIS 01662 Event_scheduler::stop() 01663 01664 RETURN VALUE 01665 OP_OK OK 01666 OP_CANT_KILL Error during stopping of manager thread 01667 OP_NOT_RUNNING Manager not working 01668 01669 NOTE 01670 The caller must have acquited LOCK_scheduler_data. 01671 */ 01672 01673 enum Event_scheduler::enum_error_code 01674 Event_scheduler::stop() 01675 { 01676 THD *thd= current_thd; 01677 DBUG_ENTER("Event_scheduler::stop"); 01678 DBUG_PRINT("enter", ("thd=%p", current_thd)); 01679 01680 LOCK_SCHEDULER_DATA(); 01681 if (!is_running_or_suspended()) 01682 { 01683 /* 01684 One situation to be here is if there was a start that forked a new 01685 thread but the new thread did not acquire yet LOCK_scheduler_data. 01686 Hence, in this case return an error. 01687 */ 01688 DBUG_PRINT("info", ("manager not running but %d. doing nothing", state)); 01689 UNLOCK_SCHEDULER_DATA(); 01690 DBUG_RETURN(OP_NOT_RUNNING); 01691 } 01692 state= IN_SHUTDOWN; 01693 01694 DBUG_PRINT("info", ("Manager thread has id %d", thread_id)); 01695 sql_print_information("SCHEDULER: Killing manager thread %lu", thread_id); 01696 01697 /* 01698 Sending the COND_new_work to ::run() is a way to get this working without 01699 race conditions. If we use kill_one_thread() it will call THD::awake() and 01700 because in ::run() both THD::enter_cond()/::exit_cond() are used, 01701 THD::awake() will try to lock LOCK_scheduler_data. If we UNLOCK it before, 01702 then the pthread_cond_signal(COND_started_or_stopped) could be signaled in 01703 ::run() and we can miss the signal before we relock. A way is to use 01704 another mutex for this shutdown procedure but better not. 01705 */ 01706 pthread_cond_signal(&cond_vars[COND_new_work]); 01707 /* Or we are suspended - then we should wake up */ 01708 pthread_cond_signal(&cond_vars[COND_suspend_or_resume]); 01709 01710 /* Guarantee we don't catch spurious signals */ 01711 sql_print_information("SCHEDULER: Waiting the manager thread to reply"); 01712 while (state != INITIALIZED) 01713 { 01714 DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager " 01715 "thread. Current value of state is %d . " 01716 "workers count=%d", state, workers_count())); 01717 cond_wait(COND_started_or_stopped, &LOCK_scheduler_data); 01718 } 01719 DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT")); 01720 UNLOCK_SCHEDULER_DATA(); 01721 01722 DBUG_RETURN(OP_OK); 01723 } 01724 01725 01726 /* 01727 Suspends or resumes the scheduler. 01728 SUSPEND - it won't execute any event till resumed. 01729 RESUME - it will resume if suspended. 01730 01731 SYNOPSIS 01732 Event_scheduler::suspend_or_resume() 01733 01734 RETURN VALUE 01735 OP_OK OK 01736 */ 01737 01738 enum Event_scheduler::enum_error_code 01739 Event_scheduler::suspend_or_resume( 01740 enum Event_scheduler::enum_suspend_or_resume action) 01741 { 01742 DBUG_ENTER("Event_scheduler::suspend_or_resume"); 01743 DBUG_PRINT("enter", ("action=%d", action)); 01744 01745 LOCK_SCHEDULER_DATA(); 01746 01747 if ((action == SUSPEND && state == SUSPENDED) || 01748 (action == RESUME && state == RUNNING)) 01749 { 01750 DBUG_PRINT("info", ("Either trying to suspend suspended or resume " 01751 "running scheduler. Doing nothing.")); 01752 } 01753 else 01754 { 01755 /* Wake the main thread up if he is asleep */ 01756 DBUG_PRINT("info", ("Sending signal")); 01757 if (action==SUSPEND) 01758 { 01759 state= SUSPENDED; 01760 pthread_cond_signal(&cond_vars[COND_new_work]); 01761 } 01762 else 01763 { 01764 state= RUNNING; 01765 pthread_cond_signal(&cond_vars[COND_suspend_or_resume]); 01766 } 01767 DBUG_PRINT("info", ("Waiting on COND_suspend_or_resume")); 01768 cond_wait(COND_suspend_or_resume, &LOCK_scheduler_data); 01769 DBUG_PRINT("info", ("Got response")); 01770 } 01771 UNLOCK_SCHEDULER_DATA(); 01772 DBUG_RETURN(OP_OK); 01773 } 01774 01775 01776 /* 01777 Returns the number of executing events. 01778 01779 SYNOPSIS 01780 Event_scheduler::workers_count() 01781 */ 01782 01783 uint 01784 Event_scheduler::workers_count() 01785 { 01786 THD *tmp; 01787 uint count= 0; 01788 01789 DBUG_ENTER("Event_scheduler::workers_count"); 01790 VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list 01791 I_List_iterator<THD> it(threads); 01792 while ((tmp=it++)) 01793 { 01794 if (tmp->command == COM_DAEMON) 01795 continue; 01796 if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER) 01797 ++count; 01798 } 01799 VOID(pthread_mutex_unlock(&LOCK_thread_count)); 01800 DBUG_PRINT("exit", ("%d", count)); 01801 DBUG_RETURN(count); 01802 } 01803 01804 01805 /* 01806 Checks and suspends if needed 01807 01808 SYNOPSIS 01809 Event_scheduler::check_n_suspend_if_needed() 01810 thd Thread 01811 01812 RETURN VALUE 01813 FALSE Not suspended, we haven't slept 01814 TRUE We were suspended. LOCK_scheduler_data is unlocked. 01815 01816 NOTE 01817 The caller should have locked LOCK_scheduler_data! 01818 The mutex will be unlocked in case this function returns TRUE 01819 */ 01820 01821 bool 01822 Event_scheduler::check_n_suspend_if_needed(THD *thd) 01823 { 01824 bool was_suspended= FALSE; 01825 DBUG_ENTER("Event_scheduler::check_n_suspend_if_needed"); 01826 if (thd->killed && !shutdown_in_progress) 01827 { 01828 state= SUSPENDED; 01829 thd->killed= THD::NOT_KILLED; 01830 } 01831 if (state == SUSPENDED) 01832 { 01833 thd->enter_cond(&cond_vars[COND_suspend_or_resume], &LOCK_scheduler_data, 01834 "Suspended"); 01835 /* Send back signal to the thread that asked us to suspend operations */ 01836 pthread_cond_signal(&cond_vars[COND_suspend_or_resume]); 01837 sql_print_information("SCHEDULER: Suspending operations"); 01838 was_suspended= TRUE; 01839 } 01840 while (state == SUSPENDED) 01841 { 01842 cond_wait(COND_suspend_or_resume, &LOCK_scheduler_data); 01843 DBUG_PRINT("info", ("Woke up after waiting on COND_suspend_or_resume")); 01844 if (state != SUSPENDED) 01845 { 01846 pthread_cond_signal(&cond_vars[COND_suspend_or_resume]); 01847 sql_print_information("SCHEDULER: Resuming operations"); 01848 } 01849 } 01850 if (was_suspended) 01851 { 01852 if (queue.elements) 01853 { 01854 uint i; 01855 DBUG_PRINT("info", ("We have to recompute the execution times")); 01856 01857 for (i= 0; i < queue.elements; i++) 01858 { 01859 ((Event_timed*)queue_element(&queue, i))->compute_next_execution_time(); 01860 ((Event_timed*)queue_element(&queue, i))->update_fields(thd); 01861 } 01862 queue_fix(&queue); 01863 } 01864 /* This will implicitly unlock LOCK_scheduler_data */ 01865 thd->exit_cond(""); 01866 } 01867 DBUG_RETURN(was_suspended); 01868 } 01869 01870 01871 /* 01872 Checks for empty queue and waits till new element gets in 01873 01874 SYNOPSIS 01875 Event_scheduler::check_n_wait_for_non_empty_queue() 01876 thd Thread 01877 01878 RETURN VALUE 01879 FALSE Did not wait - LOCK_scheduler_data still locked. 01880 TRUE Waited - LOCK_scheduler_data unlocked. 01881 01882 NOTE 01883 The caller should have locked LOCK_scheduler_data! 01884 */ 01885 01886 bool 01887 Event_scheduler::check_n_wait_for_non_empty_queue(THD *thd) 01888 { 01889 bool slept= FALSE; 01890 DBUG_ENTER("Event_scheduler::check_n_wait_for_non_empty_queue"); 01891 DBUG_PRINT("enter", ("q.elements=%lu state=%s", 01892 queue.elements, states_names[state])); 01893 01894 if (!queue.elements) 01895 thd->enter_cond(&cond_vars[COND_new_work], &LOCK_scheduler_data, 01896 "Empty queue, sleeping"); 01897 01898 /* Wait in a loop protecting against catching spurious signals */ 01899 while (!queue.elements && state == RUNNING) 01900 { 01901 slept= TRUE; 01902 DBUG_PRINT("info", ("Entering condition because of empty queue")); 01903 cond_wait(COND_new_work, &LOCK_scheduler_data); 01904 DBUG_PRINT("info", ("Manager woke up. Hope we have events now. state=%d", 01905 state)); 01906 /* 01907 exit_cond does implicit mutex_UNLOCK, we needed it locked if 01908 1. we loop again 01909 2. end the current loop and start doing calculations 01910 */ 01911 } 01912 if (slept) 01913 thd->exit_cond(""); 01914 01915 DBUG_PRINT("exit", ("q.elements=%lu state=%s thd->killed=%d", 01916 queue.elements, states_names[state], thd->killed)); 01917 01918 DBUG_RETURN(slept); 01919 } 01920 01921 01922 /* 01923 Wrapper for pthread_mutex_lock 01924 01925 SYNOPSIS 01926 Event_scheduler::lock_data() 01927 mutex Mutex to lock 01928 line The line number on which the lock is done 01929 01930 RETURN VALUE 01931 Error code of pthread_mutex_lock() 01932 */ 01933 01934 inline void 01935 Event_scheduler::lock_data(const char *func, uint line) 01936 { 01937 DBUG_ENTER("Event_scheduler::lock_mutex"); 01938 DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u", 01939 &LOCK_scheduler_data, func, line)); 01940 pthread_mutex_lock(&LOCK_scheduler_data); 01941 mutex_last_locked_in_func= func; 01942 mutex_last_locked_at_line= line; 01943 mutex_scheduler_data_locked= TRUE; 01944 DBUG_VOID_RETURN; 01945 } 01946 01947 01948 /* 01949 Wrapper for pthread_mutex_unlock 01950 01951 SYNOPSIS 01952 Event_scheduler::unlock_data() 01953 mutex Mutex to unlock 01954 line The line number on which the unlock is done 01955 */ 01956 01957 inline void 01958 Event_scheduler::unlock_data(const char *func, uint line) 01959 { 01960 DBUG_ENTER("Event_scheduler::UNLOCK_mutex"); 01961 DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u", 01962 &LOCK_scheduler_data, func, line)); 01963 mutex_last_unlocked_at_line= line; 01964 mutex_scheduler_data_locked= FALSE; 01965 mutex_last_unlocked_in_func= func; 01966 pthread_mutex_unlock(&LOCK_scheduler_data); 01967 DBUG_VOID_RETURN; 01968 } 01969 01970 01971 /* 01972 Wrapper for pthread_cond_wait 01973 01974 SYNOPSIS 01975 Event_scheduler::cond_wait() 01976 cond Conditional to wait for 01977 mutex Mutex of the conditional 01978 01979 RETURN VALUE 01980 Error code of pthread_cond_wait() 01981 */ 01982 01983 inline int 01984 Event_scheduler::cond_wait(enum Event_scheduler::enum_cond_vars cond, 01985 pthread_mutex_t *mutex) 01986 { 01987 int ret; 01988 DBUG_ENTER("Event_scheduler::cond_wait"); 01989 DBUG_PRINT("enter", ("cond=%s mutex=%p", cond_vars_names[cond], mutex)); 01990 ret= pthread_cond_wait(&cond_vars[cond_waiting_on=cond], mutex); 01991 cond_waiting_on= COND_NONE; 01992 DBUG_RETURN(ret); 01993 } 01994 01995 01996 /* 01997 Checks whether the scheduler is in a running or suspended state. 01998 01999 SYNOPSIS 02000 Event_scheduler::is_running_or_suspended() 02001 02002 RETURN VALUE 02003 TRUE Either running or suspended 02004 FALSE IN_SHUTDOWN, not started, etc. 02005 */ 02006 02007 inline bool 02008 Event_scheduler::is_running_or_suspended() 02009 { 02010 return (state == SUSPENDED || state == RUNNING); 02011 } 02012 02013 02014 /* 02015 Returns the current state of the scheduler 02016 02017 SYNOPSIS 02018 Event_scheduler::get_state() 02019 */ 02020 02021 enum Event_scheduler::enum_state 02022 Event_scheduler::get_state() 02023 { 02024 enum Event_scheduler::enum_state ret; 02025 DBUG_ENTER("Event_scheduler::get_state"); 02026 /* lock_data & unlock_data are not static */ 02027 pthread_mutex_lock(&singleton.LOCK_scheduler_data); 02028 ret= singleton.state; 02029 pthread_mutex_unlock(&singleton.LOCK_scheduler_data); 02030 DBUG_RETURN(ret); 02031 } 02032 02033 02034 /* 02035 Returns whether the scheduler was initialized. 02036 02037 SYNOPSIS 02038 Event_scheduler::initialized() 02039 02040 RETURN VALUE 02041 FALSE Was not initialized so far 02042 TRUE Was initialized 02043 */ 02044 02045 bool 02046 Event_scheduler::initialized() 02047 { 02048 DBUG_ENTER("Event_scheduler::initialized"); 02049 DBUG_RETURN(Event_scheduler::get_state() != UNINITIALIZED); 02050 } 02051 02052 02053 /* 02054 Returns the number of elements in the queue 02055 02056 SYNOPSIS 02057 Event_scheduler::events_count() 02058 02059 RETURN VALUE 02060 0 Number of Event_timed objects in the queue 02061 */ 02062 02063 uint 02064 Event_scheduler::events_count() 02065 { 02066 uint n; 02067 DBUG_ENTER("Event_scheduler::events_count"); 02068 LOCK_SCHEDULER_DATA(); 02069 n= queue.elements; 02070 UNLOCK_SCHEDULER_DATA(); 02071 02072 DBUG_RETURN(n); 02073 } 02074 02075 02076 /* 02077 Looks for a named event in mysql.event and then loads it from 02078 the table, compiles and inserts it into the cache. 02079 02080 SYNOPSIS 02081 Event_scheduler::load_named_event() 02082 thd THD 02083 etn The name of the event to load and compile on scheduler's root 02084 etn_new The loaded event 02085 02086 RETURN VALUE 02087 NULL Error during compile or the event is non-enabled. 02088 otherwise Address 02089 */ 02090 02091 enum Event_scheduler::enum_error_code 02092 Event_scheduler::load_named_event(THD *thd, Event_timed *etn, Event_timed **etn_new) 02093 { 02094 int ret= 0; 02095 MEM_ROOT *tmp_mem_root; 02096 Event_timed *et_loaded= NULL; 02097 Open_tables_state backup; 02098 02099 DBUG_ENTER("Event_scheduler::load_and_compile_event"); 02100 DBUG_PRINT("enter",("thd=%p name:%*s",thd, etn->name.length, etn->name.str)); 02101 02102 thd->reset_n_backup_open_tables_state(&backup); 02103 /* No need to use my_error() here because db_find_event() has done it */ 02104 { 02105 sp_name spn(etn->dbname, etn->name); 02106 ret= db_find_event(thd, &spn, &et_loaded, NULL, &scheduler_root); 02107 } 02108 thd->restore_backup_open_tables_state(&backup); 02109 /* In this case no memory was allocated so we don't need to clean */ 02110 if (ret) 02111 DBUG_RETURN(OP_LOAD_ERROR); 02112 02113 if (et_loaded->status != Event_timed::ENABLED) 02114 { 02115 /* 02116 We don't load non-enabled events. 02117 In db_find_event() `et_new` was allocated on the heap and not on 02118 scheduler_root therefore we delete it here. 02119 */ 02120 delete et_loaded; 02121 DBUG_RETURN(OP_DISABLED_EVENT); 02122 } 02123 02124 et_loaded->compute_next_execution_time(); 02125 *etn_new= et_loaded; 02126 02127 DBUG_RETURN(OP_OK); 02128 } 02129 02130 02131 /* 02132 Loads all ENABLED events from mysql.event into the prioritized 02133 queue. Called during scheduler main thread initialization. Compiles 02134 the events. Creates Event_timed instances for every ENABLED event 02135 from mysql.event. 02136 02137 SYNOPSIS 02138 Event_scheduler::load_events_from_db() 02139 thd - Thread context. Used for memory allocation in some cases. 02140 02141 RETURN VALUE 02142 0 OK 02143 !0 Error (EVEX_OPEN_TABLE_FAILED, EVEX_MICROSECOND_UNSUP, 02144 EVEX_COMPILE_ERROR) - in all these cases mysql.event was 02145 tampered. 02146 02147 NOTES 02148 Reports the error to the console 02149 */ 02150 02151 int 02152 Event_scheduler::load_events_from_db(THD *thd) 02153 { 02154 TABLE *table; 02155 READ_RECORD read_record_info; 02156 int ret= -1; 02157 uint count= 0; 02158 bool clean_the_queue= FALSE; 02159 /* Compile the events on this root but only for syntax check, then discard */ 02160 MEM_ROOT boot_root; 02161 02162 DBUG_ENTER("Event_scheduler::load_events_from_db"); 02163 DBUG_PRINT("enter", ("thd=%p", thd)); 02164 02165 if (state > COMMENCING) 02166 { 02167 DBUG_ASSERT(0); 02168 sql_print_error("SCHEDULER: Trying to load events while already running."); 02169 DBUG_RETURN(EVEX_GENERAL_ERROR); 02170 } 02171 02172 if ((ret= Events::open_event_table(thd, TL_READ, &table))) 02173 { 02174 sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open."); 02175 DBUG_RETURN(EVEX_OPEN_TABLE_FAILED); 02176 } 02177 02178 init_alloc_root(&boot_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); 02179 init_read_record(&read_record_info, thd, table ,NULL,1,0); 02180 while (!(read_record_info.read_record(&read_record_info))) 02181 { 02182 Event_timed *et; 02183 if (!(et= new Event_timed)) 02184 { 02185 DBUG_PRINT("info", ("Out of memory")); 02186 clean_the_queue= TRUE; 02187 break; 02188 } 02189 DBUG_PRINT("info", ("Loading event from row.")); 02190 02191 if ((ret= et->load_from_row(&scheduler_root, table))) 02192 { 02193 clean_the_queue= TRUE; 02194 sql_print_error("SCHEDULER: Error while loading from mysql.event. " 02195 "Table probably corrupted"); 02196 break; 02197 } 02198 if (et->status != Event_timed::ENABLED) 02199 { 02200 DBUG_PRINT("info",("%s is disabled",et->name.str)); 02201 delete et; 02202 continue; 02203 } 02204 02205 DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str)); 02206 02207 /* We load only on scheduler root just to check whether the body compiles */ 02208 switch (ret= et->compile(thd, &boot_root)) { 02209 case EVEX_MICROSECOND_UNSUP: 02210 et->free_sp(); 02211 sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not " 02212 "supported but found in mysql.event"); 02213 goto end; 02214 case EVEX_COMPILE_ERROR: 02215 sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.", 02216 et->dbname.str, et->name.str); 02217 goto end; 02218 default: 02219 /* Free it, it will be compiled again on the worker thread */ 02220 et->free_sp(); 02221 break; 02222 } 02223 02224 /* let's find when to be executed */ 02225 if (et->compute_next_execution_time()) 02226 { 02227 sql_print_error("SCHEDULER: Error while computing execution time of %s.%s." 02228 " Skipping", et->dbname.str, et->name.str); 02229 continue; 02230 } 02231 02232 DBUG_PRINT("load_events_from_db", ("Adding %p to the exec list.")); 02233 queue_insert_safe(&queue, (byte *) et); 02234 count++; 02235 } 02236 end: 02237 end_read_record(&read_record_info); 02238 free_root(&boot_root, MYF(0)); 02239 02240 if (clean_the_queue) 02241 { 02242 for (count= 0; count < queue.elements; ++count) 02243 queue_remove(&queue, 0); 02244 ret= -1; 02245 } 02246 else 02247 { 02248 ret= 0; 02249 sql_print_information("SCHEDULER: Loaded %d event%s", count, (count == 1)?"":"s"); 02250 } 02251 02252 /* Force close to free memory */ 02253 thd->version--; 02254 02255 close_thread_tables(thd); 02256 02257 DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count)); 02258 DBUG_RETURN(ret); 02259 } 02260 02261 02262 /* 02263 Opens mysql.db and mysql.user and checks whether: 02264 1. mysql.db has column Event_priv at column 20 (0 based); 02265 2. mysql.user has column Event_priv at column 29 (0 based); 02266 02267 SYNOPSIS 02268 Event_scheduler::check_system_tables() 02269 */ 02270 02271 bool 02272 Event_scheduler::check_system_tables(THD *thd) 02273 { 02274 TABLE_LIST tables; 02275 bool not_used; 02276 Open_tables_state backup; 02277 bool ret; 02278 02279 DBUG_ENTER("Event_scheduler::check_system_tables"); 02280 DBUG_PRINT("enter", ("thd=%p", thd)); 02281 02282 thd->reset_n_backup_open_tables_state(&backup); 02283 02284 bzero((char*) &tables, sizeof(tables)); 02285 tables.db= (char*) "mysql"; 02286 tables.table_name= tables.alias= (char*) "db"; 02287 tables.lock_type= TL_READ; 02288 02289 if ((ret= simple_open_n_lock_tables(thd, &tables))) 02290 sql_print_error("Cannot open mysql.db"); 02291 else 02292 { 02293 ret= table_check_intact(tables.table, MYSQL_DB_FIELD_COUNT, 02294 mysql_db_table_fields, &mysql_db_table_last_check, 02295 ER_CANNOT_LOAD_FROM_TABLE); 02296 close_thread_tables(thd); 02297 } 02298 if (ret) 02299 DBUG_RETURN(TRUE); 02300 02301 bzero((char*) &tables, sizeof(tables)); 02302 tables.db= (char*) "mysql"; 02303 tables.table_name= tables.alias= (char*) "user"; 02304 tables.lock_type= TL_READ; 02305 02306 if ((ret= simple_open_n_lock_tables(thd, &tables))) 02307 sql_print_error("Cannot open mysql.db"); 02308 else 02309 { 02310 if (tables.table->s->fields < 29 || 02311 strncmp(tables.table->field[29]->field_name, 02312 STRING_WITH_LEN("Event_priv"))) 02313 { 02314 sql_print_error("mysql.user has no `Event_priv` column at position 29"); 02315 ret= TRUE; 02316 } 02317 close_thread_tables(thd); 02318 } 02319 02320 thd->restore_backup_open_tables_state(&backup); 02321 02322 DBUG_RETURN(ret); 02323 } 02324 02325 02326 /* 02327 Inits mutexes. 02328 02329 SYNOPSIS 02330 Event_scheduler::init_mutexes() 02331 */ 02332 02333 void 02334 Event_scheduler::init_mutexes() 02335 { 02336 pthread_mutex_init(&singleton.LOCK_scheduler_data, MY_MUTEX_INIT_FAST); 02337 } 02338 02339 02340 /* 02341 Destroys mutexes. 02342 02343 SYNOPSIS 02344 Event_scheduler::destroy_mutexes() 02345 */ 02346 02347 void 02348 Event_scheduler::destroy_mutexes() 02349 { 02350 pthread_mutex_destroy(&singleton.LOCK_scheduler_data); 02351 } 02352 02353 02354 /* 02355 Dumps some data about the internal status of the scheduler. 02356 02357 SYNOPSIS 02358 Event_scheduler::dump_internal_status() 02359 thd THD 02360 02361 RETURN VALUE 02362 0 OK 02363 1 Error 02364 */ 02365 02366 int 02367 Event_scheduler::dump_internal_status(THD *thd) 02368 { 02369 DBUG_ENTER("dump_internal_status"); 02370 #ifndef DBUG_OFF 02371 CHARSET_INFO *scs= system_charset_info; 02372 Protocol *protocol= thd->protocol; 02373 List<Item> field_list; 02374 int ret; 02375 char tmp_buff[5*STRING_BUFFER_USUAL_SIZE]; 02376 char int_buff[STRING_BUFFER_USUAL_SIZE]; 02377 String tmp_string(tmp_buff, sizeof(tmp_buff), scs); 02378 String int_string(int_buff, sizeof(int_buff), scs); 02379 tmp_string.length(0); 02380 int_string.length(0); 02381 02382 field_list.push_back(new Item_empty_string("Name", 20)); 02383 field_list.push_back(new Item_empty_string("Value",20)); 02384 if (protocol->send_fields(&field_list, Protocol::SEND_NUM_ROWS | 02385 Protocol::SEND_EOF)) 02386 DBUG_RETURN(1); 02387 02388 protocol->prepare_for_resend(); 02389 protocol->store(STRING_WITH_LEN("state"), scs); 02390 protocol->store(states_names[singleton.state].str, 02391 states_names[singleton.state].length, 02392 scs); 02393 02394 ret= protocol->write(); 02395 /* 02396 If not initialized - don't show anything else. get_instance() 02397 will otherwise implicitly initialize it. We don't want that. 02398 */ 02399 if (singleton.state >= INITIALIZED) 02400 { 02401 /* last locked at*/ 02402 /* 02403 The first thing to do, or get_instance() will overwrite the values. 02404 mutex_last_locked_at_line / mutex_last_unlocked_at_line 02405 */ 02406 protocol->prepare_for_resend(); 02407 protocol->store(STRING_WITH_LEN("last locked at"), scs); 02408 tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(), 02409 tmp_string.alloced_length(), "%s::%d", 02410 singleton.mutex_last_locked_in_func, 02411 singleton.mutex_last_locked_at_line)); 02412 protocol->store(&tmp_string); 02413 ret= protocol->write(); 02414 02415 /* last unlocked at*/ 02416 protocol->prepare_for_resend(); 02417 protocol->store(STRING_WITH_LEN("last unlocked at"), scs); 02418 tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(), 02419 tmp_string.alloced_length(), "%s::%d", 02420 singleton.mutex_last_unlocked_in_func, 02421 singleton.mutex_last_unlocked_at_line)); 02422 protocol->store(&tmp_string); 02423 ret= protocol->write(); 02424 02425 /* waiting on */ 02426 protocol->prepare_for_resend(); 02427 protocol->store(STRING_WITH_LEN("waiting on condition"), scs); 02428 tmp_string.length(scs->cset-> 02429 snprintf(scs, (char*) tmp_string.ptr(), 02430 tmp_string.alloced_length(), "%s", 02431 (singleton.cond_waiting_on != COND_NONE) ? 02432 cond_vars_names[singleton.cond_waiting_on]: 02433 "NONE")); 02434 protocol->store(&tmp_string); 02435 ret= protocol->write(); 02436 02437 Event_scheduler *scheduler= get_instance(); 02438 02439 /* workers_count */ 02440 protocol->prepare_for_resend(); 02441 protocol->store(STRING_WITH_LEN("workers_count"), scs); 02442 int_string.set((longlong) scheduler->workers_count(), scs); 02443 protocol->store(&int_string); 02444 ret= protocol->write(); 02445 02446 /* queue.elements */ 02447 protocol->prepare_for_resend(); 02448 protocol->store(STRING_WITH_LEN("queue.elements"), scs); 02449 int_string.set((longlong) scheduler->queue.elements, scs); 02450 protocol->store(&int_string); 02451 ret= protocol->write(); 02452 02453 /* scheduler_data_locked */ 02454 protocol->prepare_for_resend(); 02455 protocol->store(STRING_WITH_LEN("scheduler data locked"), scs); 02456 int_string.set((longlong) scheduler->mutex_scheduler_data_locked, scs); 02457 protocol->store(&int_string); 02458 ret= protocol->write(); 02459 } 02460 send_eof(thd); 02461 #endif 02462 DBUG_RETURN(0); 02463 }
1.4.7

