00001 /* Copyright (C) 2003 MySQL AB 00002 00003 This program is free software; you can redistribute it and/or modify 00004 it under the terms of the GNU General Public License as published by 00005 the Free Software Foundation; either version 2 of the License, or 00006 (at your option) any later version. 00007 00008 This program is distributed in the hope that it will be useful, 00009 but WITHOUT ANY WARRANTY; without even the implied warranty of 00010 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00011 GNU General Public License for more details. 00012 00013 You should have received a copy of the GNU General Public License 00014 along with this program; if not, write to the Free Software 00015 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ 00016 00017 #ifdef USE_PRAGMA_IMPLEMENTATION 00018 #pragma implementation // gcc: Class implementation 00019 #endif 00020 00021 #include "mysql_priv.h" 00022 #include <myisam.h> 00023 00024 #include "ha_archive.h" 00025 #include <my_dir.h> 00026 00027 #include <mysql/plugin.h> 00028 00029 /* 00030 First, if you want to understand storage engines you should look at 00031 ha_example.cc and ha_example.h. 00032 This example was written as a test case for a customer who needed 00033 a storage engine without indexes that could compress data very well. 00034 So, welcome to a completely compressed storage engine. This storage 00035 engine only does inserts. No replace, deletes, or updates. All reads are 00036 complete table scans. Compression is done through azip (bzip compresses 00037 better, but only marginally, if someone asks I could add support for 00038 it too, but beaware that it costs a lot more in CPU time then azip). 00039 00040 We keep a file pointer open for each instance of ha_archive for each read 00041 but for writes we keep one open file handle just for that. We flush it 00042 only if we have a read occur. azip handles compressing lots of records 00043 at once much better then doing lots of little records between writes. 00044 It is possible to not lock on writes but this would then mean we couldn't 00045 handle bulk inserts as well (that is if someone was trying to read at 00046 the same time since we would want to flush). 00047 00048 A "meta" file is kept alongside the data file. This file serves two purpose. 00049 The first purpose is to track the number of rows in the table. The second 00050 purpose is to determine if the table was closed properly or not. When the 00051 meta file is first opened it is marked as dirty. It is opened when the table 00052 itself is opened for writing. When the table is closed the new count for rows 00053 is written to the meta file and the file is marked as clean. If the meta file 00054 is opened and it is marked as dirty, it is assumed that a crash occured. At 00055 this point an error occurs and the user is told to rebuild the file. 00056 A rebuild scans the rows and rewrites the meta file. If corruption is found 00057 in the data file then the meta file is not repaired. 00058 00059 At some point a recovery method for such a drastic case needs to be divised. 00060 00061 Locks are row level, and you will get a consistant read. 00062 00063 For performance as far as table scans go it is quite fast. I don't have 00064 good numbers but locally it has out performed both Innodb and MyISAM. For 00065 Innodb the question will be if the table can be fit into the buffer 00066 pool. For MyISAM its a question of how much the file system caches the 00067 MyISAM file. With enough free memory MyISAM is faster. Its only when the OS 00068 doesn't have enough memory to cache entire table that archive turns out 00069 to be any faster. 00070 00071 Examples between MyISAM (packed) and Archive. 00072 00073 Table with 76695844 identical rows: 00074 29680807 a_archive.ARZ 00075 920350317 a.MYD 00076 00077 00078 Table with 8991478 rows (all of Slashdot's comments): 00079 1922964506 comment_archive.ARZ 00080 2944970297 comment_text.MYD 00081 00082 00083 TODO: 00084 Add bzip optional support. 00085 Allow users to set compression level. 00086 Implement versioning, should be easy. 00087 Allow for errors, find a way to mark bad rows. 00088 Add optional feature so that rows can be flushed at interval (which will cause less 00089 compression but may speed up ordered searches). 00090 Checkpoint the meta file to allow for faster rebuilds. 00091 Dirty open (right now the meta file is repaired if a crash occured). 00092 Option to allow for dirty reads, this would lower the sync calls, which would make 00093 inserts a lot faster, but would mean highly arbitrary reads. 00094 00095 -Brian 00096 */ 00097 /* 00098 Notes on file formats. 00099 The Meta file is layed out as: 00100 check - Just an int of 254 to make sure that the the file we are opening was 00101 never corrupted. 00102 version - The current version of the file format. 00103 rows - This is an unsigned long long which is the number of rows in the data 00104 file. 00105 check point - Reserved for future use 00106 auto increment - MAX value for autoincrement 00107 dirty - Status of the file, whether or not its values are the latest. This 00108 flag is what causes a repair to occur 00109 00110 The data file: 00111 check - Just an int of 254 to make sure that the the file we are opening was 00112 never corrupted. 00113 version - The current version of the file format. 00114 data - The data is stored in a "row +blobs" format. 00115 */ 00116 00117 /* If the archive storage engine has been inited */ 00118 static bool archive_inited= FALSE; 00119 /* Variables for archive share methods */ 00120 pthread_mutex_t archive_mutex; 00121 static HASH archive_open_tables; 00122 00123 /* The file extension */ 00124 #define ARZ ".ARZ" // The data file 00125 #define ARN ".ARN" // Files used during an optimize call 00126 #define ARM ".ARM" // Meta file 00127 /* 00128 uchar + uchar + ulonglong + ulonglong + ulonglong + ulonglong + FN_REFLEN 00129 + uchar 00130 */ 00131 #define META_BUFFER_SIZE sizeof(uchar) + sizeof(uchar) + sizeof(ulonglong) \ 00132 + sizeof(ulonglong) + sizeof(ulonglong) + sizeof(ulonglong) + FN_REFLEN \ 00133 + sizeof(uchar) 00134 00135 /* 00136 uchar + uchar 00137 */ 00138 #define DATA_BUFFER_SIZE 2 // Size of the data used in the data file 00139 #define ARCHIVE_CHECK_HEADER 254 // The number we use to determine corruption 00140 00141 /* Static declarations for handerton */ 00142 static handler *archive_create_handler(TABLE_SHARE *table, MEM_ROOT *mem_root); 00143 /* 00144 Number of rows that will force a bulk insert. 00145 */ 00146 #define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2 00147 00148 handlerton archive_hton; 00149 00150 static handler *archive_create_handler(TABLE_SHARE *table, MEM_ROOT *mem_root) 00151 { 00152 return new (mem_root) ha_archive(table); 00153 } 00154 00155 /* 00156 Used for hash table that tracks open tables. 00157 */ 00158 static byte* archive_get_key(ARCHIVE_SHARE *share,uint *length, 00159 my_bool not_used __attribute__((unused))) 00160 { 00161 *length=share->table_name_length; 00162 return (byte*) share->table_name; 00163 } 00164 00165 00166 /* 00167 Initialize the archive handler. 00168 00169 SYNOPSIS 00170 archive_db_init() 00171 void 00172 00173 RETURN 00174 FALSE OK 00175 TRUE Error 00176 */ 00177 00178 int archive_db_init() 00179 { 00180 DBUG_ENTER("archive_db_init"); 00181 if (archive_inited) 00182 DBUG_RETURN(FALSE); 00183 00184 archive_hton.state=SHOW_OPTION_YES; 00185 archive_hton.db_type=DB_TYPE_ARCHIVE_DB; 00186 archive_hton.create=archive_create_handler; 00187 archive_hton.panic=archive_db_end; 00188 archive_hton.flags=HTON_NO_FLAGS; 00189 00190 if (pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST)) 00191 goto error; 00192 if (hash_init(&archive_open_tables, system_charset_info, 32, 0, 0, 00193 (hash_get_key) archive_get_key, 0, 0)) 00194 { 00195 VOID(pthread_mutex_destroy(&archive_mutex)); 00196 } 00197 else 00198 { 00199 archive_inited= TRUE; 00200 DBUG_RETURN(FALSE); 00201 } 00202 error: 00203 DBUG_RETURN(TRUE); 00204 } 00205 00206 /* 00207 Release the archive handler. 00208 00209 SYNOPSIS 00210 archive_db_done() 00211 void 00212 00213 RETURN 00214 FALSE OK 00215 */ 00216 00217 int archive_db_done() 00218 { 00219 if (archive_inited) 00220 { 00221 hash_free(&archive_open_tables); 00222 VOID(pthread_mutex_destroy(&archive_mutex)); 00223 } 00224 archive_inited= 0; 00225 return 0; 00226 } 00227 00228 00229 int archive_db_end(ha_panic_function type) 00230 { 00231 return archive_db_done(); 00232 } 00233 00234 ha_archive::ha_archive(TABLE_SHARE *table_arg) 00235 :handler(&archive_hton, table_arg), delayed_insert(0), bulk_insert(0) 00236 { 00237 /* Set our original buffer from pre-allocated memory */ 00238 buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info); 00239 00240 /* The size of the offset value we will use for position() */ 00241 ref_length = sizeof(my_off_t); 00242 } 00243 00244 /* 00245 This method reads the header of a datafile and returns whether or not it was successful. 00246 */ 00247 int ha_archive::read_data_header(azio_stream *file_to_read) 00248 { 00249 uchar data_buffer[DATA_BUFFER_SIZE]; 00250 DBUG_ENTER("ha_archive::read_data_header"); 00251 00252 if (azrewind(file_to_read) == -1) 00253 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); 00254 00255 if (azread(file_to_read, data_buffer, DATA_BUFFER_SIZE) != DATA_BUFFER_SIZE) 00256 DBUG_RETURN(errno ? errno : -1); 00257 00258 DBUG_PRINT("ha_archive::read_data_header", ("Check %u", data_buffer[0])); 00259 DBUG_PRINT("ha_archive::read_data_header", ("Version %u", data_buffer[1])); 00260 00261 if ((data_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) && 00262 (data_buffer[1] != (uchar)ARCHIVE_VERSION)) 00263 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); 00264 00265 DBUG_RETURN(0); 00266 } 00267 00268 /* 00269 This method writes out the header of a datafile and returns whether or not it was successful. 00270 */ 00271 int ha_archive::write_data_header(azio_stream *file_to_write) 00272 { 00273 uchar data_buffer[DATA_BUFFER_SIZE]; 00274 DBUG_ENTER("ha_archive::write_data_header"); 00275 00276 data_buffer[0]= (uchar)ARCHIVE_CHECK_HEADER; 00277 data_buffer[1]= (uchar)ARCHIVE_VERSION; 00278 00279 if (azwrite(file_to_write, &data_buffer, DATA_BUFFER_SIZE) != 00280 DATA_BUFFER_SIZE) 00281 goto error; 00282 DBUG_PRINT("ha_archive::write_data_header", ("Check %u", (uint)data_buffer[0])); 00283 DBUG_PRINT("ha_archive::write_data_header", ("Version %u", (uint)data_buffer[1])); 00284 00285 DBUG_RETURN(0); 00286 error: 00287 DBUG_RETURN(errno); 00288 } 00289 00290 /* 00291 This method reads the header of a meta file and returns whether or not it was successful. 00292 *rows will contain the current number of rows in the data file upon success. 00293 */ 00294 int ha_archive::read_meta_file(File meta_file, ha_rows *rows, 00295 ulonglong *auto_increment, 00296 ulonglong *forced_flushes, 00297 char *real_path) 00298 { 00299 uchar meta_buffer[META_BUFFER_SIZE]; 00300 uchar *ptr= meta_buffer; 00301 ulonglong check_point; 00302 00303 DBUG_ENTER("ha_archive::read_meta_file"); 00304 00305 VOID(my_seek(meta_file, 0, MY_SEEK_SET, MYF(0))); 00306 if (my_read(meta_file, (byte*)meta_buffer, META_BUFFER_SIZE, 0) != META_BUFFER_SIZE) 00307 DBUG_RETURN(-1); 00308 00309 /* 00310 Parse out the meta data, we ignore version at the moment 00311 */ 00312 00313 ptr+= sizeof(uchar)*2; // Move past header 00314 *rows= (ha_rows)uint8korr(ptr); 00315 ptr+= sizeof(ulonglong); // Move past rows 00316 check_point= uint8korr(ptr); 00317 ptr+= sizeof(ulonglong); // Move past check_point 00318 *auto_increment= uint8korr(ptr); 00319 ptr+= sizeof(ulonglong); // Move past auto_increment 00320 *forced_flushes= uint8korr(ptr); 00321 ptr+= sizeof(ulonglong); // Move past forced_flush 00322 memmove(real_path, ptr, FN_REFLEN); 00323 ptr+= FN_REFLEN; // Move past the possible location of the file 00324 00325 DBUG_PRINT("ha_archive::read_meta_file", ("Check %d", (uint)meta_buffer[0])); 00326 DBUG_PRINT("ha_archive::read_meta_file", ("Version %d", (uint)meta_buffer[1])); 00327 DBUG_PRINT("ha_archive::read_meta_file", ("Rows %llu", *rows)); 00328 DBUG_PRINT("ha_archive::read_meta_file", ("Checkpoint %llu", check_point)); 00329 DBUG_PRINT("ha_archive::read_meta_file", ("Auto-Increment %llu", *auto_increment)); 00330 DBUG_PRINT("ha_archive::read_meta_file", ("Forced Flushes %llu", *forced_flushes)); 00331 DBUG_PRINT("ha_archive::read_meta_file", ("Real Path %s", real_path)); 00332 DBUG_PRINT("ha_archive::read_meta_file", ("Dirty %d", (int)(*ptr))); 00333 00334 if ((meta_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) || 00335 ((bool)(*ptr)== TRUE)) 00336 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); 00337 00338 my_sync(meta_file, MYF(MY_WME)); 00339 00340 DBUG_RETURN(0); 00341 } 00342 00343 /* 00344 This method writes out the header of a meta file and returns whether or not it was successful. 00345 By setting dirty you say whether or not the file represents the actual state of the data file. 00346 Upon ::open() we set to dirty, and upon ::close() we set to clean. 00347 */ 00348 int ha_archive::write_meta_file(File meta_file, ha_rows rows, 00349 ulonglong auto_increment, 00350 ulonglong forced_flushes, 00351 char *real_path, 00352 bool dirty) 00353 { 00354 uchar meta_buffer[META_BUFFER_SIZE]; 00355 uchar *ptr= meta_buffer; 00356 ulonglong check_point= 0; //Reserved for the future 00357 00358 DBUG_ENTER("ha_archive::write_meta_file"); 00359 00360 *ptr= (uchar)ARCHIVE_CHECK_HEADER; 00361 ptr += sizeof(uchar); 00362 *ptr= (uchar)ARCHIVE_VERSION; 00363 ptr += sizeof(uchar); 00364 int8store(ptr, (ulonglong)rows); 00365 ptr += sizeof(ulonglong); 00366 int8store(ptr, check_point); 00367 ptr += sizeof(ulonglong); 00368 int8store(ptr, auto_increment); 00369 ptr += sizeof(ulonglong); 00370 int8store(ptr, forced_flushes); 00371 ptr += sizeof(ulonglong); 00372 // No matter what, we pad with nulls 00373 if (real_path) 00374 strncpy((char *)ptr, real_path, FN_REFLEN); 00375 else 00376 bzero(ptr, FN_REFLEN); 00377 ptr += FN_REFLEN; 00378 *ptr= (uchar)dirty; 00379 DBUG_PRINT("ha_archive::write_meta_file", ("Check %d", 00380 (uint)ARCHIVE_CHECK_HEADER)); 00381 DBUG_PRINT("ha_archive::write_meta_file", ("Version %d", 00382 (uint)ARCHIVE_VERSION)); 00383 DBUG_PRINT("ha_archive::write_meta_file", ("Rows %llu", (ulonglong)rows)); 00384 DBUG_PRINT("ha_archive::write_meta_file", ("Checkpoint %llu", check_point)); 00385 DBUG_PRINT("ha_archive::write_meta_file", ("Auto Increment %llu", 00386 auto_increment)); 00387 DBUG_PRINT("ha_archive::write_meta_file", ("Forced Flushes %llu", 00388 forced_flushes)); 00389 DBUG_PRINT("ha_archive::write_meta_file", ("Real path %s", 00390 real_path)); 00391 DBUG_PRINT("ha_archive::write_meta_file", ("Dirty %d", (uint)dirty)); 00392 00393 VOID(my_seek(meta_file, 0, MY_SEEK_SET, MYF(0))); 00394 if (my_write(meta_file, (byte *)meta_buffer, META_BUFFER_SIZE, 0) != META_BUFFER_SIZE) 00395 DBUG_RETURN(-1); 00396 00397 my_sync(meta_file, MYF(MY_WME)); 00398 00399 DBUG_RETURN(0); 00400 } 00401 00402 00403 /* 00404 We create the shared memory space that we will use for the open table. 00405 No matter what we try to get or create a share. This is so that a repair 00406 table operation can occur. 00407 00408 See ha_example.cc for a longer description. 00409 */ 00410 ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, 00411 TABLE *table, int *rc) 00412 { 00413 ARCHIVE_SHARE *share; 00414 char meta_file_name[FN_REFLEN]; 00415 uint length; 00416 char *tmp_name; 00417 DBUG_ENTER("ha_archive::get_share"); 00418 00419 pthread_mutex_lock(&archive_mutex); 00420 length=(uint) strlen(table_name); 00421 00422 if (!(share=(ARCHIVE_SHARE*) hash_search(&archive_open_tables, 00423 (byte*) table_name, 00424 length))) 00425 { 00426 if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL), 00427 &share, sizeof(*share), 00428 &tmp_name, length+1, 00429 NullS)) 00430 { 00431 pthread_mutex_unlock(&archive_mutex); 00432 *rc= HA_ERR_OUT_OF_MEM; 00433 DBUG_RETURN(NULL); 00434 } 00435 00436 share->use_count= 0; 00437 share->table_name_length= length; 00438 share->table_name= tmp_name; 00439 share->crashed= FALSE; 00440 share->archive_write_open= FALSE; 00441 fn_format(share->data_file_name, table_name, "", 00442 ARZ,MY_REPLACE_EXT|MY_UNPACK_FILENAME); 00443 fn_format(meta_file_name, table_name, "", ARM, 00444 MY_REPLACE_EXT|MY_UNPACK_FILENAME); 00445 strmov(share->table_name,table_name); 00446 /* 00447 We will use this lock for rows. 00448 */ 00449 VOID(pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST)); 00450 if ((share->meta_file= my_open(meta_file_name, O_RDWR, MYF(0))) == -1) 00451 share->crashed= TRUE; 00452 DBUG_PRINT("info", ("archive opening (1) up write at %s", 00453 share->data_file_name)); 00454 00455 /* 00456 We read the meta file, but do not mark it dirty unless we actually do 00457 a write. 00458 */ 00459 if (read_meta_file(share->meta_file, &share->rows_recorded, 00460 &share->auto_increment_value, 00461 &share->forced_flushes, 00462 share->real_path)) 00463 share->crashed= TRUE; 00464 /* 00465 Since we now possibly no real_path, we will use it instead if it exists. 00466 */ 00467 if (*share->real_path) 00468 fn_format(share->data_file_name, share->real_path, "", ARZ, 00469 MY_REPLACE_EXT|MY_UNPACK_FILENAME); 00470 VOID(my_hash_insert(&archive_open_tables, (byte*) share)); 00471 thr_lock_init(&share->lock); 00472 } 00473 share->use_count++; 00474 DBUG_PRINT("info", ("archive table %.*s has %d open handles now", 00475 share->table_name_length, share->table_name, 00476 share->use_count)); 00477 if (share->crashed) 00478 *rc= HA_ERR_CRASHED_ON_USAGE; 00479 pthread_mutex_unlock(&archive_mutex); 00480 00481 DBUG_RETURN(share); 00482 } 00483 00484 00485 /* 00486 Free the share. 00487 See ha_example.cc for a description. 00488 */ 00489 int ha_archive::free_share(ARCHIVE_SHARE *share) 00490 { 00491 int rc= 0; 00492 DBUG_ENTER("ha_archive::free_share"); 00493 DBUG_PRINT("info", ("archive table %.*s has %d open handles on entrance", 00494 share->table_name_length, share->table_name, 00495 share->use_count)); 00496 00497 pthread_mutex_lock(&archive_mutex); 00498 if (!--share->use_count) 00499 { 00500 hash_delete(&archive_open_tables, (byte*) share); 00501 thr_lock_delete(&share->lock); 00502 VOID(pthread_mutex_destroy(&share->mutex)); 00503 /* 00504 We need to make sure we don't reset the crashed state. 00505 If we open a crashed file, wee need to close it as crashed unless 00506 it has been repaired. 00507 Since we will close the data down after this, we go on and count 00508 the flush on close; 00509 */ 00510 share->forced_flushes++; 00511 (void)write_meta_file(share->meta_file, share->rows_recorded, 00512 share->auto_increment_value, 00513 share->forced_flushes, 00514 share->real_path, 00515 share->crashed ? TRUE :FALSE); 00516 if (share->archive_write_open) 00517 if (azclose(&(share->archive_write))) 00518 rc= 1; 00519 if (my_close(share->meta_file, MYF(0))) 00520 rc= 1; 00521 my_free((gptr) share, MYF(0)); 00522 } 00523 pthread_mutex_unlock(&archive_mutex); 00524 00525 DBUG_RETURN(rc); 00526 } 00527 00528 int ha_archive::init_archive_writer() 00529 { 00530 DBUG_ENTER("ha_archive::init_archive_writer"); 00531 (void)write_meta_file(share->meta_file, share->rows_recorded, 00532 share->auto_increment_value, 00533 share->forced_flushes, 00534 share->real_path, 00535 TRUE); 00536 00537 /* 00538 It is expensive to open and close the data files and since you can't have 00539 a gzip file that can be both read and written we keep a writer open 00540 that is shared amoung all open tables. 00541 */ 00542 if (!(azopen(&(share->archive_write), share->data_file_name, 00543 O_WRONLY|O_APPEND|O_BINARY))) 00544 { 00545 DBUG_PRINT("info", ("Could not open archive write file")); 00546 share->crashed= TRUE; 00547 DBUG_RETURN(1); 00548 } 00549 share->archive_write_open= TRUE; 00550 00551 DBUG_RETURN(0); 00552 } 00553 00554 00555 /* 00556 We just implement one additional file extension. 00557 */ 00558 static const char *ha_archive_exts[] = { 00559 ARZ, 00560 ARM, 00561 NullS 00562 }; 00563 00564 const char **ha_archive::bas_ext() const 00565 { 00566 return ha_archive_exts; 00567 } 00568 00569 00570 /* 00571 When opening a file we: 00572 Create/get our shared structure. 00573 Init out lock. 00574 We open the file we will read from. 00575 */ 00576 int ha_archive::open(const char *name, int mode, uint open_options) 00577 { 00578 int rc= 0; 00579 DBUG_ENTER("ha_archive::open"); 00580 00581 DBUG_PRINT("info", ("archive table was opened for crash: %s", 00582 (open_options & HA_OPEN_FOR_REPAIR) ? "yes" : "no")); 00583 share= get_share(name, table, &rc); 00584 00585 if (rc == HA_ERR_CRASHED_ON_USAGE && !(open_options & HA_OPEN_FOR_REPAIR)) 00586 { 00587 free_share(share); 00588 DBUG_RETURN(rc); 00589 } 00590 else if (rc == HA_ERR_OUT_OF_MEM) 00591 { 00592 DBUG_RETURN(rc); 00593 } 00594 00595 thr_lock_data_init(&share->lock,&lock,NULL); 00596 00597 DBUG_PRINT("info", ("archive data_file_name %s", share->data_file_name)); 00598 if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY))) 00599 { 00600 if (errno == EROFS || errno == EACCES) 00601 DBUG_RETURN(my_errno= errno); 00602 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); 00603 } 00604 00605 DBUG_PRINT("info", ("archive table was crashed %s", 00606 rc == HA_ERR_CRASHED_ON_USAGE ? "yes" : "no")); 00607 if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR) 00608 { 00609 DBUG_RETURN(0); 00610 } 00611 else 00612 DBUG_RETURN(rc); 00613 } 00614 00615 00616 /* 00617 Closes the file. 00618 00619 SYNOPSIS 00620 close(); 00621 00622 IMPLEMENTATION: 00623 00624 We first close this storage engines file handle to the archive and 00625 then remove our reference count to the table (and possibly free it 00626 as well). 00627 00628 RETURN 00629 0 ok 00630 1 Error 00631 */ 00632 00633 int ha_archive::close(void) 00634 { 00635 int rc= 0; 00636 DBUG_ENTER("ha_archive::close"); 00637 00638 /* First close stream */ 00639 if (azclose(&archive)) 00640 rc= 1; 00641 /* then also close share */ 00642 rc|= free_share(share); 00643 00644 DBUG_RETURN(rc); 00645 } 00646 00647 00648 /* 00649 We create our data file here. The format is pretty simple. 00650 You can read about the format of the data file above. 00651 Unlike other storage engines we do not "pack" our data. Since we 00652 are about to do a general compression, packing would just be a waste of 00653 CPU time. If the table has blobs they are written after the row in the order 00654 of creation. 00655 */ 00656 00657 int ha_archive::create(const char *name, TABLE *table_arg, 00658 HA_CREATE_INFO *create_info) 00659 { 00660 File create_file; // We use to create the datafile and the metafile 00661 char name_buff[FN_REFLEN]; 00662 int error; 00663 DBUG_ENTER("ha_archive::create"); 00664 00665 stats.auto_increment_value= (create_info->auto_increment_value ? 00666 create_info->auto_increment_value -1 : 00667 (ulonglong) 0); 00668 00669 for (uint key= 0; key < table_arg->s->keys; key++) 00670 { 00671 KEY *pos= table_arg->key_info+key; 00672 KEY_PART_INFO *key_part= pos->key_part; 00673 KEY_PART_INFO *key_part_end= key_part + pos->key_parts; 00674 00675 for (; key_part != key_part_end; key_part++) 00676 { 00677 Field *field= key_part->field; 00678 00679 if (!(field->flags & AUTO_INCREMENT_FLAG)) 00680 { 00681 error= -1; 00682 DBUG_PRINT("info", ("Index error in creating archive table")); 00683 goto error; 00684 } 00685 } 00686 } 00687 00688 if ((create_file= my_create(fn_format(name_buff,name,"",ARM, 00689 MY_REPLACE_EXT|MY_UNPACK_FILENAME),0, 00690 O_RDWR | O_TRUNC,MYF(MY_WME))) < 0) 00691 { 00692 error= my_errno; 00693 goto error; 00694 } 00695 00696 write_meta_file(create_file, 0, stats.auto_increment_value, 0, 00697 (char *)create_info->data_file_name, 00698 FALSE); 00699 my_close(create_file,MYF(0)); 00700 00701 /* 00702 We reuse name_buff since it is available. 00703 */ 00704 if (create_info->data_file_name) 00705 { 00706 char linkname[FN_REFLEN]; 00707 DBUG_PRINT("info", ("archive will create stream file %s", 00708 create_info->data_file_name)); 00709 00710 fn_format(name_buff, create_info->data_file_name, "", ARZ, 00711 MY_REPLACE_EXT|MY_UNPACK_FILENAME); 00712 fn_format(linkname, name, "", ARZ, 00713 MY_UNPACK_FILENAME | MY_APPEND_EXT); 00714 if ((create_file= my_create_with_symlink(linkname, name_buff, 0, 00715 O_RDWR | O_TRUNC,MYF(MY_WME))) < 0) 00716 { 00717 error= my_errno; 00718 goto error; 00719 } 00720 } 00721 else 00722 { 00723 if ((create_file= my_create(fn_format(name_buff, name,"", ARZ, 00724 MY_REPLACE_EXT|MY_UNPACK_FILENAME),0, 00725 O_RDWR | O_TRUNC,MYF(MY_WME))) < 0) 00726 { 00727 error= my_errno; 00728 goto error; 00729 } 00730 } 00731 if (!azdopen(&archive, create_file, O_WRONLY|O_BINARY)) 00732 { 00733 error= errno; 00734 goto error2; 00735 } 00736 if (write_data_header(&archive)) 00737 { 00738 error= errno; 00739 goto error3; 00740 } 00741 00742 if (azclose(&archive)) 00743 { 00744 error= errno; 00745 goto error2; 00746 } 00747 00748 DBUG_RETURN(0); 00749 00750 error3: 00751 /* We already have an error, so ignore results of azclose. */ 00752 (void)azclose(&archive); 00753 error2: 00754 my_close(create_file, MYF(0)); 00755 delete_table(name); 00756 error: 00757 /* Return error number, if we got one */ 00758 DBUG_RETURN(error ? error : -1); 00759 } 00760 00761 /* 00762 This is where the actual row is written out. 00763 */ 00764 int ha_archive::real_write_row(byte *buf, azio_stream *writer) 00765 { 00766 my_off_t written; 00767 uint *ptr, *end; 00768 DBUG_ENTER("ha_archive::real_write_row"); 00769 00770 written= azwrite(writer, buf, table->s->reclength); 00771 DBUG_PRINT("ha_archive::real_write_row", ("Wrote %d bytes expected %d", 00772 written, table->s->reclength)); 00773 if (!delayed_insert || !bulk_insert) 00774 share->dirty= TRUE; 00775 00776 if (written != (my_off_t)table->s->reclength) 00777 DBUG_RETURN(errno ? errno : -1); 00778 /* 00779 We should probably mark the table as damagaged if the record is written 00780 but the blob fails. 00781 */ 00782 for (ptr= table->s->blob_field, end= ptr + table->s->blob_fields ; 00783 ptr != end ; 00784 ptr++) 00785 { 00786 char *data_ptr; 00787 uint32 size= ((Field_blob*) table->field[*ptr])->get_length(); 00788 00789 if (size) 00790 { 00791 ((Field_blob*) table->field[*ptr])->get_ptr(&data_ptr); 00792 written= azwrite(writer, data_ptr, (unsigned)size); 00793 if (written != (my_off_t)size) 00794 DBUG_RETURN(errno ? errno : -1); 00795 } 00796 } 00797 DBUG_RETURN(0); 00798 } 00799 00800 00801 /* 00802 Look at ha_archive::open() for an explanation of the row format. 00803 Here we just write out the row. 00804 00805 Wondering about start_bulk_insert()? We don't implement it for 00806 archive since it optimizes for lots of writes. The only save 00807 for implementing start_bulk_insert() is that we could skip 00808 setting dirty to true each time. 00809 */ 00810 int ha_archive::write_row(byte *buf) 00811 { 00812 int rc; 00813 byte *read_buf= NULL; 00814 ulonglong temp_auto; 00815 DBUG_ENTER("ha_archive::write_row"); 00816 00817 if (share->crashed) 00818 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); 00819 00820 ha_statistic_increment(&SSV::ha_write_count); 00821 if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT) 00822 table->timestamp_field->set_time(); 00823 pthread_mutex_lock(&share->mutex); 00824 00825 if (table->next_number_field) 00826 { 00827 KEY *mkey= &table->s->key_info[0]; // We only support one key right now 00828 update_auto_increment(); 00829 temp_auto= table->next_number_field->val_int(); 00830 00831 /* 00832 Bad news, this will cause a search for the unique value which is very 00833 expensive since we will have to do a table scan which will lock up 00834 all other writers during this period. This could perhaps be optimized 00835 in the future. 00836 */ 00837 if (temp_auto == share->auto_increment_value && 00838 mkey->flags & HA_NOSAME) 00839 { 00840 rc= HA_ERR_FOUND_DUPP_KEY; 00841 goto error; 00842 } 00843 00844 if (temp_auto < share->auto_increment_value && 00845 mkey->flags & HA_NOSAME) 00846 { 00847 /* 00848 First we create a buffer that we can use for reading rows, and can pass 00849 to get_row(). 00850 */ 00851 if (!(read_buf= (byte*) my_malloc(table->s->reclength, MYF(MY_WME)))) 00852 { 00853 rc= HA_ERR_OUT_OF_MEM; 00854 goto error; 00855 } 00856 /* 00857 All of the buffer must be written out or we won't see all of the 00858 data 00859 */ 00860 azflush(&(share->archive_write), Z_SYNC_FLUSH); 00861 share->forced_flushes++; 00862 /* 00863 Set the position of the local read thread to the beginning postion. 00864 */ 00865 if (read_data_header(&archive)) 00866 { 00867 rc= HA_ERR_CRASHED_ON_USAGE; 00868 goto error; 00869 } 00870 00871 /* 00872 Now we read and check all of the rows. 00873 if (!memcmp(table->next_number_field->ptr, mfield->ptr, mfield->max_length())) 00874 if ((longlong)temp_auto == 00875 mfield->val_int((char*)(read_buf + mfield->offset()))) 00876 */ 00877 Field *mfield= table->next_number_field; 00878 00879 while (!(get_row(&archive, read_buf))) 00880 { 00881 if (!memcmp(read_buf + mfield->offset(), table->next_number_field->ptr, 00882 mfield->max_length())) 00883 { 00884 rc= HA_ERR_FOUND_DUPP_KEY; 00885 goto error; 00886 } 00887 } 00888 } 00889 else 00890 { 00891 if (temp_auto > share->auto_increment_value) 00892 stats.auto_increment_value= share->auto_increment_value= temp_auto; 00893 } 00894 } 00895 00896 /* 00897 Notice that the global auto_increment has been increased. 00898 In case of a failed row write, we will never try to reuse the value. 00899 */ 00900 if (!share->archive_write_open) 00901 if (init_archive_writer()) 00902 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); 00903 00904 /* 00905 Varchar structures are constant in size but are not cleaned up request 00906 to request. The following sets all unused space to null to improve 00907 compression. 00908 */ 00909 for (Field **field=table->field ; *field ; field++) 00910 { 00911 DBUG_PRINT("archive",("Pack is %d\n", (*field)->pack_length())); 00912 DBUG_PRINT("archive",("MyPack is %d\n", (*field)->data_length((char*) buf + (*field)->offset()))); 00913 if ((*field)->real_type() == MYSQL_TYPE_VARCHAR) 00914 { 00915 uint actual_length= (*field)->data_length((char*) buf + (*field)->offset()); 00916 uint offset= (*field)->offset() + actual_length + 00917 (actual_length > 255 ? 2 : 1); 00918 DBUG_PRINT("archive",("Offset is %d -> %d\n", actual_length, offset)); 00919 /* 00920 if ((*field)->pack_length() + (*field)->offset() != offset) 00921 bzero(buf + offset, (size_t)((*field)->pack_length() + (actual_length > 255 ? 2 : 1) - (*field)->data_length)); 00922 */ 00923 } 00924 } 00925 00926 share->rows_recorded++; 00927 rc= real_write_row(buf, &(share->archive_write)); 00928 error: 00929 pthread_mutex_unlock(&share->mutex); 00930 if (read_buf) 00931 my_free((gptr) read_buf, MYF(0)); 00932 00933 DBUG_RETURN(rc); 00934 } 00935 00936 00937 void ha_archive::get_auto_increment(ulonglong offset, ulonglong increment, 00938 ulonglong nb_desired_values, 00939 ulonglong *first_value, 00940 ulonglong *nb_reserved_values) 00941 { 00942 *nb_reserved_values= 1; 00943 *first_value= share->auto_increment_value + 1; 00944 } 00945 00946 /* Initialized at each key walk (called multiple times unlike rnd_init()) */ 00947 int ha_archive::index_init(uint keynr, bool sorted) 00948 { 00949 DBUG_ENTER("ha_archive::index_init"); 00950 active_index= keynr; 00951 DBUG_RETURN(0); 00952 } 00953 00954 00955 /* 00956 No indexes, so if we get a request for an index search since we tell 00957 the optimizer that we have unique indexes, we scan 00958 */ 00959 int ha_archive::index_read(byte *buf, const byte *key, 00960 uint key_len, enum ha_rkey_function find_flag) 00961 { 00962 int rc; 00963 DBUG_ENTER("ha_archive::index_read"); 00964 rc= index_read_idx(buf, active_index, key, key_len, find_flag); 00965 DBUG_RETURN(rc); 00966 } 00967 00968 00969 int ha_archive::index_read_idx(byte *buf, uint index, const byte *key, 00970 uint key_len, enum ha_rkey_function find_flag) 00971 { 00972 int rc= 0; 00973 bool found= 0; 00974 KEY *mkey= &table->s->key_info[index]; 00975 current_k_offset= mkey->key_part->offset; 00976 current_key= key; 00977 current_key_len= key_len; 00978 00979 00980 DBUG_ENTER("ha_archive::index_read_idx"); 00981 00982 /* 00983 All of the buffer must be written out or we won't see all of the 00984 data 00985 */ 00986 pthread_mutex_lock(&share->mutex); 00987 azflush(&(share->archive_write), Z_SYNC_FLUSH); 00988 share->forced_flushes++; 00989 pthread_mutex_unlock(&share->mutex); 00990 00991 /* 00992 Set the position of the local read thread to the beginning postion. 00993 */ 00994 if (read_data_header(&archive)) 00995 { 00996 rc= HA_ERR_CRASHED_ON_USAGE; 00997 goto error; 00998 } 00999 01000 while (!(get_row(&archive, buf))) 01001 { 01002 if (!memcmp(current_key, buf + current_k_offset, current_key_len)) 01003 { 01004 found= 1; 01005 break; 01006 } 01007 } 01008 01009 if (found) 01010 DBUG_RETURN(0); 01011 01012 error: 01013 DBUG_RETURN(rc ? rc : HA_ERR_END_OF_FILE); 01014 } 01015 01016 01017 int ha_archive::index_next(byte * buf) 01018 { 01019 bool found= 0; 01020 01021 DBUG_ENTER("ha_archive::index_next"); 01022 01023 while (!(get_row(&archive, buf))) 01024 { 01025 if (!memcmp(current_key, buf+current_k_offset, current_key_len)) 01026 { 01027 found= 1; 01028 break; 01029 } 01030 } 01031 01032 DBUG_RETURN(found ? 0 : HA_ERR_END_OF_FILE); 01033 } 01034 01035 /* 01036 All calls that need to scan the table start with this method. If we are told 01037 that it is a table scan we rewind the file to the beginning, otherwise 01038 we assume the position will be set. 01039 */ 01040 01041 int ha_archive::rnd_init(bool scan) 01042 { 01043 DBUG_ENTER("ha_archive::rnd_init"); 01044 01045 if (share->crashed) 01046 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); 01047 01048 /* We rewind the file so that we can read from the beginning if scan */ 01049 if (scan) 01050 { 01051 scan_rows= share->rows_recorded; 01052 DBUG_PRINT("info", ("archive will retrieve %llu rows", scan_rows)); 01053 stats.records= 0; 01054 01055 /* 01056 If dirty, we lock, and then reset/flush the data. 01057 I found that just calling azflush() doesn't always work. 01058 */ 01059 if (share->dirty == TRUE) 01060 { 01061 pthread_mutex_lock(&share->mutex); 01062 if (share->dirty == TRUE) 01063 { 01064 DBUG_PRINT("info", ("archive flushing out rows for scan")); 01065 azflush(&(share->archive_write), Z_SYNC_FLUSH); 01066 share->forced_flushes++; 01067 share->dirty= FALSE; 01068 } 01069 pthread_mutex_unlock(&share->mutex); 01070 } 01071 01072 if (read_data_header(&archive)) 01073 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); 01074 } 01075 01076 DBUG_RETURN(0); 01077 } 01078 01079 01080 /* 01081 This is the method that is used to read a row. It assumes that the row is 01082 positioned where you want it. 01083 */ 01084 int ha_archive::get_row(azio_stream *file_to_read, byte *buf) 01085 { 01086 int read; // Bytes read, azread() returns int 01087 uint *ptr, *end; 01088 char *last; 01089 size_t total_blob_length= 0; 01090 MY_BITMAP *read_set= table->read_set; 01091 DBUG_ENTER("ha_archive::get_row"); 01092 01093 read= azread(file_to_read, buf, table->s->reclength); 01094 DBUG_PRINT("ha_archive::get_row", ("Read %d bytes expected %d", read, 01095 table->s->reclength)); 01096 01097 if (read == Z_STREAM_ERROR) 01098 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); 01099 01100 /* If we read nothing we are at the end of the file */ 01101 if (read == 0) 01102 DBUG_RETURN(HA_ERR_END_OF_FILE); 01103 01104 /* 01105 If the record is the wrong size, the file is probably damaged, unless 01106 we are dealing with a delayed insert or a bulk insert. 01107 */ 01108 if ((ulong) read != table->s->reclength) 01109 DBUG_RETURN(HA_ERR_END_OF_FILE); 01110 01111 /* Calculate blob length, we use this for our buffer */ 01112 for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ; 01113 ptr != end ; 01114 ptr++) 01115 { 01116 if (bitmap_is_set(read_set, 01117 (((Field_blob*) table->field[*ptr])->field_index))) 01118 total_blob_length += ((Field_blob*) table->field[*ptr])->get_length(); 01119 } 01120 01121 /* Adjust our row buffer if we need be */ 01122 buffer.alloc(total_blob_length); 01123 last= (char *)buffer.ptr(); 01124 01125 /* Loop through our blobs and read them */ 01126 for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ; 01127 ptr != end ; 01128 ptr++) 01129 { 01130 size_t size= ((Field_blob*) table->field[*ptr])->get_length(); 01131 if (size) 01132 { 01133 if (bitmap_is_set(read_set, 01134 ((Field_blob*) table->field[*ptr])->field_index)) 01135 { 01136 read= azread(file_to_read, last, size); 01137 if ((size_t) read != size) 01138 DBUG_RETURN(HA_ERR_END_OF_FILE); 01139 ((Field_blob*) table->field[*ptr])->set_ptr(size, last); 01140 last += size; 01141 } 01142 else 01143 { 01144 (void)azseek(file_to_read, size, SEEK_CUR); 01145 } 01146 } 01147 } 01148 DBUG_RETURN(0); 01149 } 01150 01151 01152 /* 01153 Called during ORDER BY. Its position is either from being called sequentially 01154 or by having had ha_archive::rnd_pos() called before it is called. 01155 */ 01156 01157 int ha_archive::rnd_next(byte *buf) 01158 { 01159 int rc; 01160 DBUG_ENTER("ha_archive::rnd_next"); 01161 01162 if (share->crashed) 01163 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); 01164 01165 if (!scan_rows) 01166 DBUG_RETURN(HA_ERR_END_OF_FILE); 01167 scan_rows--; 01168 01169 ha_statistic_increment(&SSV::ha_read_rnd_next_count); 01170 current_position= aztell(&archive); 01171 rc= get_row(&archive, buf); 01172 01173 01174 if (rc != HA_ERR_END_OF_FILE) 01175 stats.records++; 01176 01177 DBUG_RETURN(rc); 01178 } 01179 01180 01181 /* 01182 Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after 01183 each call to ha_archive::rnd_next() if an ordering of the rows is 01184 needed. 01185 */ 01186 01187 void ha_archive::position(const byte *record) 01188 { 01189 DBUG_ENTER("ha_archive::position"); 01190 my_store_ptr(ref, ref_length, current_position); 01191 DBUG_VOID_RETURN; 01192 } 01193 01194 01195 /* 01196 This is called after a table scan for each row if the results of the 01197 scan need to be ordered. It will take *pos and use it to move the 01198 cursor in the file so that the next row that is called is the 01199 correctly ordered row. 01200 */ 01201 01202 int ha_archive::rnd_pos(byte * buf, byte *pos) 01203 { 01204 DBUG_ENTER("ha_archive::rnd_pos"); 01205 ha_statistic_increment(&SSV::ha_read_rnd_next_count); 01206 current_position= (my_off_t)my_get_ptr(pos, ref_length); 01207 (void)azseek(&archive, current_position, SEEK_SET); 01208 01209 DBUG_RETURN(get_row(&archive, buf)); 01210 } 01211 01212 /* 01213 This method repairs the meta file. It does this by walking the datafile and 01214 rewriting the meta file. Currently it does this by calling optimize with 01215 the extended flag. 01216 */ 01217 int ha_archive::repair(THD* thd, HA_CHECK_OPT* check_opt) 01218 { 01219 DBUG_ENTER("ha_archive::repair"); 01220 check_opt->flags= T_EXTEND; 01221 int rc= optimize(thd, check_opt); 01222 01223 if (rc) 01224 DBUG_RETURN(HA_ERR_CRASHED_ON_REPAIR); 01225 01226 share->crashed= FALSE; 01227 DBUG_RETURN(0); 01228 } 01229 01230 /* 01231 The table can become fragmented if data was inserted, read, and then 01232 inserted again. What we do is open up the file and recompress it completely. 01233 */ 01234 int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt) 01235 { 01236 DBUG_ENTER("ha_archive::optimize"); 01237 int rc= 0; 01238 azio_stream writer; 01239 char writer_filename[FN_REFLEN]; 01240 01241 /* Open up the writer if we haven't yet */ 01242 if (!share->archive_write_open) 01243 init_archive_writer(); 01244 01245 /* Flush any waiting data */ 01246 azflush(&(share->archive_write), Z_SYNC_FLUSH); 01247 share->forced_flushes++; 01248 01249 /* Lets create a file to contain the new data */ 01250 fn_format(writer_filename, share->table_name, "", ARN, 01251 MY_REPLACE_EXT|MY_UNPACK_FILENAME); 01252 01253 if (!(azopen(&writer, writer_filename, O_CREAT|O_WRONLY|O_TRUNC|O_BINARY))) 01254 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); 01255 01256 /* 01257 An extended rebuild is a lot more effort. We open up each row and re-record it. 01258 Any dead rows are removed (aka rows that may have been partially recorded). 01259 */ 01260 01261 if (check_opt->flags == T_EXTEND) 01262 { 01263 DBUG_PRINT("info", ("archive extended rebuild")); 01264 byte *buf; 01265 01266 /* 01267 First we create a buffer that we can use for reading rows, and can pass 01268 to get_row(). 01269 */ 01270 if (!(buf= (byte*) my_malloc(table->s->reclength, MYF(MY_WME)))) 01271 { 01272 rc= HA_ERR_OUT_OF_MEM; 01273 goto error; 01274 } 01275 01276 /* 01277 Now we will rewind the archive file so that we are positioned at the 01278 start of the file. 01279 */ 01280 rc= read_data_header(&archive); 01281 01282 /* 01283 Assuming now error from rewinding the archive file, we now write out the 01284 new header for out data file. 01285 */ 01286 if (!rc) 01287 rc= write_data_header(&writer); 01288 01289 /* 01290 On success of writing out the new header, we now fetch each row and 01291 insert it into the new archive file. 01292 */ 01293 if (!rc) 01294 { 01295 share->rows_recorded= 0; 01296 stats.auto_increment_value= share->auto_increment_value= 0; 01297 while (!(rc= get_row(&archive, buf))) 01298 { 01299 real_write_row(buf, &writer); 01300 if (table->found_next_number_field) 01301 { 01302 Field *field= table->found_next_number_field; 01303 ulonglong auto_value= 01304 (ulonglong) field->val_int((char*)(buf + field->offset())); 01305 if (share->auto_increment_value < auto_value) 01306 stats.auto_increment_value= share->auto_increment_value= 01307 auto_value; 01308 } 01309 share->rows_recorded++; 01310 } 01311 } 01312 DBUG_PRINT("info", ("recovered %llu archive rows", share->rows_recorded)); 01313 01314 my_free((char*)buf, MYF(0)); 01315 if (rc && rc != HA_ERR_END_OF_FILE) 01316 goto error; 01317 } 01318 else 01319 { 01320 DBUG_PRINT("info", ("archive quick rebuild")); 01321 /* 01322 The quick method is to just read the data raw, and then compress it directly. 01323 */ 01324 int read; // Bytes read, azread() returns int 01325 char block[IO_SIZE]; 01326 if (azrewind(&archive) == -1) 01327 { 01328 rc= HA_ERR_CRASHED_ON_USAGE; 01329 DBUG_PRINT("info", ("archive HA_ERR_CRASHED_ON_USAGE")); 01330 goto error; 01331 } 01332 01333 while ((read= azread(&archive, block, IO_SIZE)) > 0) 01334 azwrite(&writer, block, read); 01335 } 01336 01337 azclose(&writer); 01338 share->dirty= FALSE; 01339 share->forced_flushes= 0; 01340 01341 // now we close both our writer and our reader for the rename 01342 azclose(&(share->archive_write)); 01343 azclose(&archive); 01344 01345 // make the file we just wrote be our data file 01346 rc = my_rename(writer_filename,share->data_file_name,MYF(0)); 01347 01348 /* 01349 now open the shared writer back up 01350 we don't check rc here because we want to open the file back up even 01351 if the optimize failed but we will return rc below so that we will 01352 know it failed. 01353 We also need to reopen our read descriptor since it has changed. 01354 */ 01355 DBUG_PRINT("info", ("Reopening archive data file")); 01356 if (!azopen(&(share->archive_write), share->data_file_name, 01357 O_WRONLY|O_APPEND|O_BINARY) || 01358 !azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY)) 01359 { 01360 DBUG_PRINT("info", ("Could not open archive write file")); 01361 rc= HA_ERR_CRASHED_ON_USAGE; 01362 } 01363 01364 DBUG_RETURN(rc); 01365 error: 01366 azclose(&writer); 01367 01368 DBUG_RETURN(rc); 01369 } 01370 01371 /* 01372 Below is an example of how to setup row level locking. 01373 */ 01374 THR_LOCK_DATA **ha_archive::store_lock(THD *thd, 01375 THR_LOCK_DATA **to, 01376 enum thr_lock_type lock_type) 01377 { 01378 if (lock_type == TL_WRITE_DELAYED) 01379 delayed_insert= TRUE; 01380 else 01381 delayed_insert= FALSE; 01382 01383 if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) 01384 { 01385 /* 01386 Here is where we get into the guts of a row level lock. 01387 If TL_UNLOCK is set 01388 If we are not doing a LOCK TABLE or DISCARD/IMPORT 01389 TABLESPACE, then allow multiple writers 01390 */ 01391 01392 if ((lock_type >= TL_WRITE_CONCURRENT_INSERT && 01393 lock_type <= TL_WRITE) && !thd_in_lock_tables(thd) 01394 && !thd_tablespace_op(thd)) 01395 lock_type = TL_WRITE_ALLOW_WRITE; 01396 01397 /* 01398 In queries of type INSERT INTO t1 SELECT ... FROM t2 ... 01399 MySQL would use the lock TL_READ_NO_INSERT on t2, and that 01400 would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts 01401 to t2. Convert the lock to a normal read lock to allow 01402 concurrent inserts to t2. 01403 */ 01404 01405 if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd)) 01406 lock_type = TL_READ; 01407 01408 lock.type=lock_type; 01409 } 01410 01411 *to++= &lock; 01412 01413 return to; 01414 } 01415 01416 void ha_archive::update_create_info(HA_CREATE_INFO *create_info) 01417 { 01418 ha_archive::info(HA_STATUS_AUTO | HA_STATUS_CONST); 01419 if (!(create_info->used_fields & HA_CREATE_USED_AUTO)) 01420 { 01421 create_info->auto_increment_value= stats.auto_increment_value; 01422 } 01423 if (*share->real_path) 01424 create_info->data_file_name= share->real_path; 01425 } 01426 01427 01428 /* 01429 Hints for optimizer, see ha_tina for more information 01430 */ 01431 void ha_archive::info(uint flag) 01432 { 01433 DBUG_ENTER("ha_archive::info"); 01434 /* 01435 This should be an accurate number now, though bulk and delayed inserts can 01436 cause the number to be inaccurate. 01437 */ 01438 stats.records= share->rows_recorded; 01439 stats.deleted= 0; 01440 /* Costs quite a bit more to get all information */ 01441 if (flag & HA_STATUS_TIME) 01442 { 01443 MY_STAT file_stat; // Stat information for the data file 01444 01445 VOID(my_stat(share->data_file_name, &file_stat, MYF(MY_WME))); 01446 01447 stats.mean_rec_length= table->s->reclength + buffer.alloced_length(); 01448 stats.data_file_length= file_stat.st_size; 01449 stats.create_time= file_stat.st_ctime; 01450 stats.update_time= file_stat.st_mtime; 01451 stats.max_data_file_length= share->rows_recorded * stats.mean_rec_length; 01452 } 01453 stats.delete_length= 0; 01454 stats.index_file_length=0; 01455 01456 if (flag & HA_STATUS_AUTO) 01457 stats.auto_increment_value= share->auto_increment_value; 01458 01459 DBUG_VOID_RETURN; 01460 } 01461 01462 01463 /* 01464 This method tells us that a bulk insert operation is about to occur. We set 01465 a flag which will keep write_row from saying that its data is dirty. This in 01466 turn will keep selects from causing a sync to occur. 01467 Basically, yet another optimizations to keep compression working well. 01468 */ 01469 void ha_archive::start_bulk_insert(ha_rows rows) 01470 { 01471 DBUG_ENTER("ha_archive::start_bulk_insert"); 01472 if (!rows || rows >= ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT) 01473 bulk_insert= TRUE; 01474 DBUG_VOID_RETURN; 01475 } 01476 01477 01478 /* 01479 Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert 01480 flag, and set the share dirty so that the next select will call sync for us. 01481 */ 01482 int ha_archive::end_bulk_insert() 01483 { 01484 DBUG_ENTER("ha_archive::end_bulk_insert"); 01485 bulk_insert= FALSE; 01486 share->dirty= TRUE; 01487 DBUG_RETURN(0); 01488 } 01489 01490 /* 01491 We cancel a truncate command. The only way to delete an archive table is to drop it. 01492 This is done for security reasons. In a later version we will enable this by 01493 allowing the user to select a different row format. 01494 */ 01495 int ha_archive::delete_all_rows() 01496 { 01497 DBUG_ENTER("ha_archive::delete_all_rows"); 01498 DBUG_RETURN(HA_ERR_WRONG_COMMAND); 01499 } 01500 01501 /* 01502 We just return state if asked. 01503 */ 01504 bool ha_archive::is_crashed() const 01505 { 01506 DBUG_ENTER("ha_archive::is_crashed"); 01507 DBUG_RETURN(share->crashed); 01508 } 01509 01510 /* 01511 Simple scan of the tables to make sure everything is ok. 01512 */ 01513 01514 int ha_archive::check(THD* thd, HA_CHECK_OPT* check_opt) 01515 { 01516 int rc= 0; 01517 byte *buf; 01518 const char *old_proc_info; 01519 ha_rows count= share->rows_recorded; 01520 DBUG_ENTER("ha_archive::check"); 01521 01522 old_proc_info= thd_proc_info(thd, "Checking table"); 01523 /* Flush any waiting data */ 01524 azflush(&(share->archive_write), Z_SYNC_FLUSH); 01525 share->forced_flushes++; 01526 01527 /* 01528 First we create a buffer that we can use for reading rows, and can pass 01529 to get_row(). 01530 */ 01531 if (!(buf= (byte*) my_malloc(table->s->reclength, MYF(MY_WME)))) 01532 rc= HA_ERR_OUT_OF_MEM; 01533 01534 /* 01535 Now we will rewind the archive file so that we are positioned at the 01536 start of the file. 01537 */ 01538 if (!rc) 01539 read_data_header(&archive); 01540 01541 if (!rc) 01542 while (!(rc= get_row(&archive, buf))) 01543 count--; 01544 01545 my_free((char*)buf, MYF(0)); 01546 01547 thd_proc_info(thd, old_proc_info); 01548 01549 if ((rc && rc != HA_ERR_END_OF_FILE) || count) 01550 { 01551 share->crashed= FALSE; 01552 DBUG_RETURN(HA_ADMIN_CORRUPT); 01553 } 01554 else 01555 { 01556 DBUG_RETURN(HA_ADMIN_OK); 01557 } 01558 } 01559 01560 /* 01561 Check and repair the table if needed. 01562 */ 01563 bool ha_archive::check_and_repair(THD *thd) 01564 { 01565 HA_CHECK_OPT check_opt; 01566 DBUG_ENTER("ha_archive::check_and_repair"); 01567 01568 check_opt.init(); 01569 01570 DBUG_RETURN(repair(thd, &check_opt)); 01571 } 01572 01573 struct st_mysql_storage_engine archive_storage_engine= 01574 { MYSQL_HANDLERTON_INTERFACE_VERSION, &archive_hton }; 01575 01576 mysql_declare_plugin(archive) 01577 { 01578 MYSQL_STORAGE_ENGINE_PLUGIN, 01579 &archive_storage_engine, 01580 "ARCHIVE", 01581 "Brian Aker, MySQL AB", 01582 "Archive storage engine", 01583 archive_db_init, /* Plugin Init */ 01584 archive_db_done, /* Plugin Deinit */ 01585 0x0100 /* 1.0 */, 01586 0 01587 } 01588 mysql_declare_plugin_end; 01589
1.4.7

