00001 /* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB 00002 00003 This program is free software; you can redistribute it and/or modify 00004 it under the terms of the GNU General Public License as published by 00005 the Free Software Foundation; either version 2 of the License, or 00006 (at your option) any later version. 00007 00008 This program is distributed in the hope that it will be useful, 00009 but WITHOUT ANY WARRANTY; without even the implied warranty of 00010 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00011 GNU General Public License for more details. 00012 00013 You should have received a copy of the GNU General Public License 00014 along with this program; if not, write to the Free Software 00015 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ 00016 00017 /* 00018 Creates a index for a database by reading keys, sorting them and outputing 00019 them in sorted order through SORT_INFO functions. 00020 */ 00021 00022 #include "fulltext.h" 00023 #if defined(MSDOS) || defined(__WIN__) 00024 #include <fcntl.h> 00025 #else 00026 #include <stddef.h> 00027 #endif 00028 #include <queues.h> 00029 00030 /* static variables */ 00031 00032 #undef MIN_SORT_MEMORY 00033 #undef MYF_RW 00034 #undef DISK_BUFFER_SIZE 00035 00036 #define MERGEBUFF 15 00037 #define MERGEBUFF2 31 00038 #define MIN_SORT_MEMORY (4096-MALLOC_OVERHEAD) 00039 #define MYF_RW MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL) 00040 #define DISK_BUFFER_SIZE (IO_SIZE*16) 00041 00042 00043 /* 00044 Pointers of functions for store and read keys from temp file 00045 */ 00046 00047 extern void print_error _VARARGS((const char *fmt,...)); 00048 00049 /* Functions defined in this file */ 00050 00051 static ha_rows NEAR_F find_all_keys(MI_SORT_PARAM *info,uint keys, 00052 uchar **sort_keys, 00053 DYNAMIC_ARRAY *buffpek,int *maxbuffer, 00054 IO_CACHE *tempfile, 00055 IO_CACHE *tempfile_for_exceptions); 00056 static int NEAR_F write_keys(MI_SORT_PARAM *info,uchar **sort_keys, 00057 uint count, BUFFPEK *buffpek,IO_CACHE *tempfile); 00058 static int NEAR_F write_key(MI_SORT_PARAM *info, uchar *key, 00059 IO_CACHE *tempfile); 00060 static int NEAR_F write_index(MI_SORT_PARAM *info,uchar * *sort_keys, 00061 uint count); 00062 static int NEAR_F merge_many_buff(MI_SORT_PARAM *info,uint keys, 00063 uchar * *sort_keys, 00064 BUFFPEK *buffpek,int *maxbuffer, 00065 IO_CACHE *t_file); 00066 static uint NEAR_F read_to_buffer(IO_CACHE *fromfile,BUFFPEK *buffpek, 00067 uint sort_length); 00068 static int NEAR_F merge_buffers(MI_SORT_PARAM *info,uint keys, 00069 IO_CACHE *from_file, IO_CACHE *to_file, 00070 uchar * *sort_keys, BUFFPEK *lastbuff, 00071 BUFFPEK *Fb, BUFFPEK *Tb); 00072 static int NEAR_F merge_index(MI_SORT_PARAM *,uint,uchar **,BUFFPEK *, int, 00073 IO_CACHE *); 00074 static int flush_ft_buf(MI_SORT_PARAM *info); 00075 00076 static int NEAR_F write_keys_varlen(MI_SORT_PARAM *info,uchar **sort_keys, 00077 uint count, BUFFPEK *buffpek, 00078 IO_CACHE *tempfile); 00079 static uint NEAR_F read_to_buffer_varlen(IO_CACHE *fromfile,BUFFPEK *buffpek, 00080 uint sort_length); 00081 static int NEAR_F write_merge_key(MI_SORT_PARAM *info, IO_CACHE *to_file, 00082 char *key, uint sort_length, uint count); 00083 static int NEAR_F write_merge_key_varlen(MI_SORT_PARAM *info, 00084 IO_CACHE *to_file, 00085 char* key, uint sort_length, 00086 uint count); 00087 static inline int 00088 my_var_write(MI_SORT_PARAM *info, IO_CACHE *to_file, byte *bufs); 00089 00090 /* 00091 Creates a index of sorted keys 00092 00093 SYNOPSIS 00094 _create_index_by_sort() 00095 info Sort parameters 00096 no_messages Set to 1 if no output 00097 sortbuff_size Size if sortbuffer to allocate 00098 00099 RESULT 00100 0 ok 00101 <> 0 Error 00102 */ 00103 00104 int _create_index_by_sort(MI_SORT_PARAM *info,my_bool no_messages, 00105 ulong sortbuff_size) 00106 { 00107 int error,maxbuffer,skr; 00108 uint memavl,old_memavl,keys,sort_length; 00109 DYNAMIC_ARRAY buffpek; 00110 ha_rows records; 00111 uchar **sort_keys; 00112 IO_CACHE tempfile, tempfile_for_exceptions; 00113 DBUG_ENTER("_create_index_by_sort"); 00114 DBUG_PRINT("enter",("sort_length: %d", info->key_length)); 00115 00116 if (info->keyinfo->flag & HA_VAR_LENGTH_KEY) 00117 { 00118 info->write_keys=write_keys_varlen; 00119 info->read_to_buffer=read_to_buffer_varlen; 00120 info->write_key=write_merge_key_varlen; 00121 } 00122 else 00123 { 00124 info->write_keys=write_keys; 00125 info->read_to_buffer=read_to_buffer; 00126 info->write_key=write_merge_key; 00127 } 00128 00129 my_b_clear(&tempfile); 00130 my_b_clear(&tempfile_for_exceptions); 00131 bzero((char*) &buffpek,sizeof(buffpek)); 00132 sort_keys= (uchar **) NULL; error= 1; 00133 maxbuffer=1; 00134 00135 memavl=max(sortbuff_size,MIN_SORT_MEMORY); 00136 records= info->sort_info->max_records; 00137 sort_length= info->key_length; 00138 LINT_INIT(keys); 00139 00140 while (memavl >= MIN_SORT_MEMORY) 00141 { 00142 if ((my_off_t) (records+1)*(sort_length+sizeof(char*)) <= 00143 (my_off_t) memavl) 00144 keys= records+1; 00145 else 00146 do 00147 { 00148 skr=maxbuffer; 00149 if (memavl < sizeof(BUFFPEK)*(uint) maxbuffer || 00150 (keys=(memavl-sizeof(BUFFPEK)*(uint) maxbuffer)/ 00151 (sort_length+sizeof(char*))) <= 1) 00152 { 00153 mi_check_print_error(info->sort_info->param, 00154 "sort_buffer_size is to small"); 00155 goto err; 00156 } 00157 } 00158 while ((maxbuffer= (int) (records/(keys-1)+1)) != skr); 00159 00160 if ((sort_keys=(uchar **)my_malloc(keys*(sort_length+sizeof(char*))+ 00161 HA_FT_MAXBYTELEN, MYF(0)))) 00162 { 00163 if (my_init_dynamic_array(&buffpek, sizeof(BUFFPEK), maxbuffer, 00164 maxbuffer/2)) 00165 { 00166 my_free((gptr) sort_keys,MYF(0)); 00167 sort_keys= 0; 00168 } 00169 else 00170 break; 00171 } 00172 old_memavl=memavl; 00173 if ((memavl=memavl/4*3) < MIN_SORT_MEMORY && old_memavl > MIN_SORT_MEMORY) 00174 memavl=MIN_SORT_MEMORY; 00175 } 00176 if (memavl < MIN_SORT_MEMORY) 00177 { 00178 mi_check_print_error(info->sort_info->param,"Sort buffer to small"); /* purecov: tested */ 00179 goto err; /* purecov: tested */ 00180 } 00181 (*info->lock_in_memory)(info->sort_info->param);/* Everything is allocated */ 00182 00183 if (!no_messages) 00184 printf(" - Searching for keys, allocating buffer for %d keys\n",keys); 00185 00186 if ((records=find_all_keys(info,keys,sort_keys,&buffpek,&maxbuffer, 00187 &tempfile,&tempfile_for_exceptions)) 00188 == HA_POS_ERROR) 00189 goto err; /* purecov: tested */ 00190 if (maxbuffer == 0) 00191 { 00192 if (!no_messages) 00193 printf(" - Dumping %lu keys\n", (ulong) records); 00194 if (write_index(info,sort_keys, (uint) records)) 00195 goto err; /* purecov: inspected */ 00196 } 00197 else 00198 { 00199 keys=(keys*(sort_length+sizeof(char*)))/sort_length; 00200 if (maxbuffer >= MERGEBUFF2) 00201 { 00202 if (!no_messages) 00203 printf(" - Merging %lu keys\n", (ulong) records); /* purecov: tested */ 00204 if (merge_many_buff(info,keys,sort_keys, 00205 dynamic_element(&buffpek,0,BUFFPEK *),&maxbuffer,&tempfile)) 00206 goto err; /* purecov: inspected */ 00207 } 00208 if (flush_io_cache(&tempfile) || 00209 reinit_io_cache(&tempfile,READ_CACHE,0L,0,0)) 00210 goto err; /* purecov: inspected */ 00211 if (!no_messages) 00212 printf(" - Last merge and dumping keys\n"); /* purecov: tested */ 00213 if (merge_index(info,keys,sort_keys,dynamic_element(&buffpek,0,BUFFPEK *), 00214 maxbuffer,&tempfile)) 00215 goto err; /* purecov: inspected */ 00216 } 00217 00218 if (flush_ft_buf(info) || flush_pending_blocks(info)) 00219 goto err; 00220 00221 if (my_b_inited(&tempfile_for_exceptions)) 00222 { 00223 MI_INFO *index=info->sort_info->info; 00224 uint keyno=info->key; 00225 uint key_length, ref_length=index->s->rec_reflength; 00226 00227 if (!no_messages) 00228 printf(" - Adding exceptions\n"); /* purecov: tested */ 00229 if (flush_io_cache(&tempfile_for_exceptions) || 00230 reinit_io_cache(&tempfile_for_exceptions,READ_CACHE,0L,0,0)) 00231 goto err; 00232 00233 while (!my_b_read(&tempfile_for_exceptions,(byte*)&key_length, 00234 sizeof(key_length)) 00235 && !my_b_read(&tempfile_for_exceptions,(byte*)sort_keys, 00236 (uint) key_length)) 00237 { 00238 if (_mi_ck_write(index,keyno,(uchar*) sort_keys,key_length-ref_length)) 00239 goto err; 00240 } 00241 } 00242 00243 error =0; 00244 00245 err: 00246 if (sort_keys) 00247 my_free((gptr) sort_keys,MYF(0)); 00248 delete_dynamic(&buffpek); 00249 close_cached_file(&tempfile); 00250 close_cached_file(&tempfile_for_exceptions); 00251 00252 DBUG_RETURN(error ? -1 : 0); 00253 } /* _create_index_by_sort */ 00254 00255 00256 /* Search after all keys and place them in a temp. file */ 00257 00258 static ha_rows NEAR_F find_all_keys(MI_SORT_PARAM *info, uint keys, 00259 uchar **sort_keys, DYNAMIC_ARRAY *buffpek, 00260 int *maxbuffer, IO_CACHE *tempfile, 00261 IO_CACHE *tempfile_for_exceptions) 00262 { 00263 int error; 00264 uint idx; 00265 DBUG_ENTER("find_all_keys"); 00266 00267 idx=error=0; 00268 sort_keys[0]=(uchar*) (sort_keys+keys); 00269 00270 while (!(error=(*info->key_read)(info,sort_keys[idx]))) 00271 { 00272 if (info->real_key_length > info->key_length) 00273 { 00274 if (write_key(info,sort_keys[idx],tempfile_for_exceptions)) 00275 DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */ 00276 continue; 00277 } 00278 00279 if (++idx == keys) 00280 { 00281 if (info->write_keys(info,sort_keys,idx-1,(BUFFPEK *)alloc_dynamic(buffpek), 00282 tempfile)) 00283 DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */ 00284 00285 sort_keys[0]=(uchar*) (sort_keys+keys); 00286 memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length); 00287 idx=1; 00288 } 00289 sort_keys[idx]=sort_keys[idx-1]+info->key_length; 00290 } 00291 if (error > 0) 00292 DBUG_RETURN(HA_POS_ERROR); /* Aborted by get_key */ /* purecov: inspected */ 00293 if (buffpek->elements) 00294 { 00295 if (info->write_keys(info,sort_keys,idx,(BUFFPEK *)alloc_dynamic(buffpek), 00296 tempfile)) 00297 DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */ 00298 *maxbuffer=buffpek->elements-1; 00299 } 00300 else 00301 *maxbuffer=0; 00302 00303 DBUG_RETURN((*maxbuffer)*(keys-1)+idx); 00304 } /* find_all_keys */ 00305 00306 00307 #ifdef THREAD 00308 /* Search after all keys and place them in a temp. file */ 00309 00310 pthread_handler_t thr_find_all_keys(void *arg) 00311 { 00312 MI_SORT_PARAM *info= (MI_SORT_PARAM*) arg; 00313 int error; 00314 uint memavl,old_memavl,keys,sort_length; 00315 uint idx, maxbuffer; 00316 uchar **sort_keys=0; 00317 00318 LINT_INIT(keys); 00319 00320 error=1; 00321 00322 if (my_thread_init()) 00323 goto err; 00324 if (info->sort_info->got_error) 00325 goto err; 00326 00327 if (info->keyinfo->flag && HA_VAR_LENGTH_KEY) 00328 { 00329 info->write_keys=write_keys_varlen; 00330 info->read_to_buffer=read_to_buffer_varlen; 00331 info->write_key=write_merge_key_varlen; 00332 } 00333 else 00334 { 00335 info->write_keys=write_keys; 00336 info->read_to_buffer=read_to_buffer; 00337 info->write_key=write_merge_key; 00338 } 00339 00340 my_b_clear(&info->tempfile); 00341 my_b_clear(&info->tempfile_for_exceptions); 00342 bzero((char*) &info->buffpek,sizeof(info->buffpek)); 00343 bzero((char*) &info->unique, sizeof(info->unique)); 00344 sort_keys= (uchar **) NULL; 00345 00346 memavl=max(info->sortbuff_size, MIN_SORT_MEMORY); 00347 idx= info->sort_info->max_records; 00348 sort_length= info->key_length; 00349 maxbuffer= 1; 00350 00351 while (memavl >= MIN_SORT_MEMORY) 00352 { 00353 if ((my_off_t) (idx+1)*(sort_length+sizeof(char*)) <= 00354 (my_off_t) memavl) 00355 keys= idx+1; 00356 else 00357 { 00358 uint skr; 00359 do 00360 { 00361 skr=maxbuffer; 00362 if (memavl < sizeof(BUFFPEK)*maxbuffer || 00363 (keys=(memavl-sizeof(BUFFPEK)*maxbuffer)/ 00364 (sort_length+sizeof(char*))) <= 1) 00365 { 00366 mi_check_print_error(info->sort_info->param, 00367 "sort_buffer_size is to small"); 00368 goto err; 00369 } 00370 } 00371 while ((maxbuffer= (int) (idx/(keys-1)+1)) != skr); 00372 } 00373 if ((sort_keys=(uchar **)my_malloc(keys*(sort_length+sizeof(char*))+ 00374 ((info->keyinfo->flag & HA_FULLTEXT) ? 00375 HA_FT_MAXBYTELEN : 0), MYF(0)))) 00376 { 00377 if (my_init_dynamic_array(&info->buffpek, sizeof(BUFFPEK), 00378 maxbuffer, maxbuffer/2)) 00379 { 00380 my_free((gptr) sort_keys,MYF(0)); 00381 sort_keys= (uchar **) NULL; /* for err: label */ 00382 } 00383 else 00384 break; 00385 } 00386 old_memavl=memavl; 00387 if ((memavl=memavl/4*3) < MIN_SORT_MEMORY && old_memavl > MIN_SORT_MEMORY) 00388 memavl=MIN_SORT_MEMORY; 00389 } 00390 if (memavl < MIN_SORT_MEMORY) 00391 { 00392 mi_check_print_error(info->sort_info->param,"Sort buffer to small"); /* purecov: tested */ 00393 goto err; /* purecov: tested */ 00394 } 00395 00396 if (info->sort_info->param->testflag & T_VERBOSE) 00397 printf("Key %d - Allocating buffer for %d keys\n",info->key+1,keys); 00398 info->sort_keys=sort_keys; 00399 00400 idx=error=0; 00401 sort_keys[0]=(uchar*) (sort_keys+keys); 00402 00403 while (!(error=info->sort_info->got_error) && 00404 !(error=(*info->key_read)(info,sort_keys[idx]))) 00405 { 00406 if (info->real_key_length > info->key_length) 00407 { 00408 if (write_key(info,sort_keys[idx], &info->tempfile_for_exceptions)) 00409 goto err; 00410 continue; 00411 } 00412 00413 if (++idx == keys) 00414 { 00415 if (info->write_keys(info,sort_keys,idx-1, 00416 (BUFFPEK *)alloc_dynamic(&info->buffpek), 00417 &info->tempfile)) 00418 goto err; 00419 sort_keys[0]=(uchar*) (sort_keys+keys); 00420 memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length); 00421 idx=1; 00422 } 00423 sort_keys[idx]=sort_keys[idx-1]+info->key_length; 00424 } 00425 if (error > 0) 00426 goto err; 00427 if (info->buffpek.elements) 00428 { 00429 if (info->write_keys(info,sort_keys, idx, 00430 (BUFFPEK *) alloc_dynamic(&info->buffpek), &info->tempfile)) 00431 goto err; 00432 info->keys=(info->buffpek.elements-1)*(keys-1)+idx; 00433 } 00434 else 00435 info->keys=idx; 00436 00437 info->sort_keys_length=keys; 00438 goto ok; 00439 00440 err: 00441 info->sort_info->got_error=1; /* no need to protect this with a mutex */ 00442 if (sort_keys) 00443 my_free((gptr) sort_keys,MYF(0)); 00444 info->sort_keys=0; 00445 delete_dynamic(& info->buffpek); 00446 close_cached_file(&info->tempfile); 00447 close_cached_file(&info->tempfile_for_exceptions); 00448 00449 ok: 00450 free_root(&info->wordroot, MYF(0)); 00451 remove_io_thread(&info->read_cache); 00452 pthread_mutex_lock(&info->sort_info->mutex); 00453 info->sort_info->threads_running--; 00454 pthread_cond_signal(&info->sort_info->cond); 00455 pthread_mutex_unlock(&info->sort_info->mutex); 00456 my_thread_end(); 00457 return NULL; 00458 } 00459 00460 00461 int thr_write_keys(MI_SORT_PARAM *sort_param) 00462 { 00463 SORT_INFO *sort_info=sort_param->sort_info; 00464 MI_CHECK *param=sort_info->param; 00465 ulong length, keys; 00466 ulong *rec_per_key_part=param->rec_per_key_part; 00467 int got_error=sort_info->got_error; 00468 uint i; 00469 MI_INFO *info=sort_info->info; 00470 MYISAM_SHARE *share=info->s; 00471 MI_SORT_PARAM *sinfo; 00472 byte *mergebuf=0; 00473 LINT_INIT(length); 00474 00475 for (i= 0, sinfo= sort_param ; 00476 i < sort_info->total_keys ; 00477 i++, rec_per_key_part+=sinfo->keyinfo->keysegs, sinfo++) 00478 { 00479 if (!sinfo->sort_keys) 00480 { 00481 got_error=1; 00482 continue; 00483 } 00484 if (!got_error) 00485 { 00486 mi_set_key_active(share->state.key_map, sinfo->key); 00487 if (!sinfo->buffpek.elements) 00488 { 00489 if (param->testflag & T_VERBOSE) 00490 { 00491 printf("Key %d - Dumping %u keys\n",sinfo->key+1, sinfo->keys); 00492 fflush(stdout); 00493 } 00494 if (write_index(sinfo, sinfo->sort_keys, sinfo->keys) || 00495 flush_ft_buf(sinfo) || flush_pending_blocks(sinfo)) 00496 got_error=1; 00497 } 00498 if (!got_error && param->testflag & T_STATISTICS) 00499 update_key_parts(sinfo->keyinfo, rec_per_key_part, sinfo->unique, 00500 param->stats_method == MI_STATS_METHOD_IGNORE_NULLS? 00501 sinfo->notnull: NULL, 00502 (ulonglong) info->state->records); 00503 } 00504 my_free((gptr) sinfo->sort_keys,MYF(0)); 00505 my_free(mi_get_rec_buff_ptr(info, sinfo->rec_buff), 00506 MYF(MY_ALLOW_ZERO_PTR)); 00507 sinfo->sort_keys=0; 00508 } 00509 00510 for (i= 0, sinfo= sort_param ; 00511 i < sort_info->total_keys ; 00512 i++, 00513 delete_dynamic(&sinfo->buffpek), 00514 close_cached_file(&sinfo->tempfile), 00515 close_cached_file(&sinfo->tempfile_for_exceptions), 00516 sinfo++) 00517 { 00518 if (got_error) 00519 continue; 00520 if (sinfo->keyinfo->flag && HA_VAR_LENGTH_KEY) 00521 { 00522 sinfo->write_keys=write_keys_varlen; 00523 sinfo->read_to_buffer=read_to_buffer_varlen; 00524 sinfo->write_key=write_merge_key_varlen; 00525 } 00526 else 00527 { 00528 sinfo->write_keys=write_keys; 00529 sinfo->read_to_buffer=read_to_buffer; 00530 sinfo->write_key=write_merge_key; 00531 } 00532 if (sinfo->buffpek.elements) 00533 { 00534 uint maxbuffer=sinfo->buffpek.elements-1; 00535 if (!mergebuf) 00536 { 00537 length=param->sort_buffer_length; 00538 while (length >= MIN_SORT_MEMORY && !mergebuf) 00539 { 00540 mergebuf=my_malloc(length, MYF(0)); 00541 length=length*3/4; 00542 } 00543 if (!mergebuf) 00544 { 00545 got_error=1; 00546 continue; 00547 } 00548 } 00549 keys=length/sinfo->key_length; 00550 if (maxbuffer >= MERGEBUFF2) 00551 { 00552 if (param->testflag & T_VERBOSE) 00553 printf("Key %d - Merging %u keys\n",sinfo->key+1, sinfo->keys); 00554 if (merge_many_buff(sinfo, keys, (uchar **)mergebuf, 00555 dynamic_element(&sinfo->buffpek, 0, BUFFPEK *), 00556 (int*) &maxbuffer, &sinfo->tempfile)) 00557 { 00558 got_error=1; 00559 continue; 00560 } 00561 } 00562 if (flush_io_cache(&sinfo->tempfile) || 00563 reinit_io_cache(&sinfo->tempfile,READ_CACHE,0L,0,0)) 00564 { 00565 got_error=1; 00566 continue; 00567 } 00568 if (param->testflag & T_VERBOSE) 00569 printf("Key %d - Last merge and dumping keys\n", sinfo->key+1); 00570 if (merge_index(sinfo, keys, (uchar **)mergebuf, 00571 dynamic_element(&sinfo->buffpek,0,BUFFPEK *), 00572 maxbuffer,&sinfo->tempfile) || 00573 flush_ft_buf(sinfo) || 00574 flush_pending_blocks(sinfo)) 00575 { 00576 got_error=1; 00577 continue; 00578 } 00579 } 00580 if (my_b_inited(&sinfo->tempfile_for_exceptions)) 00581 { 00582 uint key_length; 00583 00584 if (param->testflag & T_VERBOSE) 00585 printf("Key %d - Dumping 'long' keys\n", sinfo->key+1); 00586 00587 if (flush_io_cache(&sinfo->tempfile_for_exceptions) || 00588 reinit_io_cache(&sinfo->tempfile_for_exceptions,READ_CACHE,0L,0,0)) 00589 { 00590 got_error=1; 00591 continue; 00592 } 00593 00594 while (!got_error && 00595 !my_b_read(&sinfo->tempfile_for_exceptions,(byte*)&key_length, 00596 sizeof(key_length))) 00597 { 00598 byte ft_buf[HA_FT_MAXBYTELEN + HA_FT_WLEN + 10]; 00599 if (key_length > sizeof(ft_buf) || 00600 my_b_read(&sinfo->tempfile_for_exceptions, (byte*)ft_buf, 00601 (uint)key_length) || 00602 _mi_ck_write(info, sinfo->key, (uchar*)ft_buf, 00603 key_length - info->s->rec_reflength)) 00604 got_error=1; 00605 } 00606 } 00607 } 00608 my_free((gptr) mergebuf,MYF(MY_ALLOW_ZERO_PTR)); 00609 return got_error; 00610 } 00611 #endif /* THREAD */ 00612 00613 /* Write all keys in memory to file for later merge */ 00614 00615 static int NEAR_F write_keys(MI_SORT_PARAM *info, register uchar **sort_keys, 00616 uint count, BUFFPEK *buffpek, IO_CACHE *tempfile) 00617 { 00618 uchar **end; 00619 uint sort_length=info->key_length; 00620 DBUG_ENTER("write_keys"); 00621 00622 qsort2((byte*) sort_keys,count,sizeof(byte*),(qsort2_cmp) info->key_cmp, 00623 info); 00624 if (!my_b_inited(tempfile) && 00625 open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST", 00626 DISK_BUFFER_SIZE, info->sort_info->param->myf_rw)) 00627 DBUG_RETURN(1); /* purecov: inspected */ 00628 00629 buffpek->file_pos=my_b_tell(tempfile); 00630 buffpek->count=count; 00631 00632 for (end=sort_keys+count ; sort_keys != end ; sort_keys++) 00633 { 00634 if (my_b_write(tempfile,(byte*) *sort_keys,(uint) sort_length)) 00635 DBUG_RETURN(1); /* purecov: inspected */ 00636 } 00637 DBUG_RETURN(0); 00638 } /* write_keys */ 00639 00640 00641 static inline int 00642 my_var_write(MI_SORT_PARAM *info, IO_CACHE *to_file, byte *bufs) 00643 { 00644 int err; 00645 uint16 len = _mi_keylength(info->keyinfo, (uchar*) bufs); 00646 00647 /* The following is safe as this is a local file */ 00648 if ((err= my_b_write(to_file, (byte*)&len, sizeof(len)))) 00649 return (err); 00650 if ((err= my_b_write(to_file,bufs, (uint) len))) 00651 return (err); 00652 return (0); 00653 } 00654 00655 00656 static int NEAR_F write_keys_varlen(MI_SORT_PARAM *info, 00657 register uchar **sort_keys, 00658 uint count, BUFFPEK *buffpek, 00659 IO_CACHE *tempfile) 00660 { 00661 uchar **end; 00662 int err; 00663 DBUG_ENTER("write_keys_varlen"); 00664 00665 qsort2((byte*) sort_keys,count,sizeof(byte*),(qsort2_cmp) info->key_cmp, 00666 info); 00667 if (!my_b_inited(tempfile) && 00668 open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST", 00669 DISK_BUFFER_SIZE, info->sort_info->param->myf_rw)) 00670 DBUG_RETURN(1); /* purecov: inspected */ 00671 00672 buffpek->file_pos=my_b_tell(tempfile); 00673 buffpek->count=count; 00674 for (end=sort_keys+count ; sort_keys != end ; sort_keys++) 00675 { 00676 if ((err= my_var_write(info,tempfile, (byte*) *sort_keys))) 00677 DBUG_RETURN(err); 00678 } 00679 DBUG_RETURN(0); 00680 } /* write_keys_varlen */ 00681 00682 00683 static int NEAR_F write_key(MI_SORT_PARAM *info, uchar *key, 00684 IO_CACHE *tempfile) 00685 { 00686 uint key_length=info->real_key_length; 00687 DBUG_ENTER("write_key"); 00688 00689 if (!my_b_inited(tempfile) && 00690 open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST", 00691 DISK_BUFFER_SIZE, info->sort_info->param->myf_rw)) 00692 DBUG_RETURN(1); 00693 00694 if (my_b_write(tempfile,(byte*)&key_length,sizeof(key_length)) || 00695 my_b_write(tempfile,(byte*)key,(uint) key_length)) 00696 DBUG_RETURN(1); 00697 DBUG_RETURN(0); 00698 } /* write_key */ 00699 00700 00701 /* Write index */ 00702 00703 static int NEAR_F write_index(MI_SORT_PARAM *info, register uchar **sort_keys, 00704 register uint count) 00705 { 00706 DBUG_ENTER("write_index"); 00707 00708 qsort2((gptr) sort_keys,(size_t) count,sizeof(byte*), 00709 (qsort2_cmp) info->key_cmp,info); 00710 while (count--) 00711 { 00712 if ((*info->key_write)(info,*sort_keys++)) 00713 DBUG_RETURN(-1); /* purecov: inspected */ 00714 } 00715 DBUG_RETURN(0); 00716 } /* write_index */ 00717 00718 00719 /* Merge buffers to make < MERGEBUFF2 buffers */ 00720 00721 static int NEAR_F merge_many_buff(MI_SORT_PARAM *info, uint keys, 00722 uchar **sort_keys, BUFFPEK *buffpek, 00723 int *maxbuffer, IO_CACHE *t_file) 00724 { 00725 register int i; 00726 IO_CACHE t_file2, *from_file, *to_file, *temp; 00727 BUFFPEK *lastbuff; 00728 DBUG_ENTER("merge_many_buff"); 00729 00730 if (*maxbuffer < MERGEBUFF2) 00731 DBUG_RETURN(0); /* purecov: inspected */ 00732 if (flush_io_cache(t_file) || 00733 open_cached_file(&t_file2,my_tmpdir(info->tmpdir),"ST", 00734 DISK_BUFFER_SIZE, info->sort_info->param->myf_rw)) 00735 DBUG_RETURN(1); /* purecov: inspected */ 00736 00737 from_file= t_file ; to_file= &t_file2; 00738 while (*maxbuffer >= MERGEBUFF2) 00739 { 00740 reinit_io_cache(from_file,READ_CACHE,0L,0,0); 00741 reinit_io_cache(to_file,WRITE_CACHE,0L,0,0); 00742 lastbuff=buffpek; 00743 for (i=0 ; i <= *maxbuffer-MERGEBUFF*3/2 ; i+=MERGEBUFF) 00744 { 00745 if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++, 00746 buffpek+i,buffpek+i+MERGEBUFF-1)) 00747 break; /* purecov: inspected */ 00748 } 00749 if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++, 00750 buffpek+i,buffpek+ *maxbuffer)) 00751 break; /* purecov: inspected */ 00752 if (flush_io_cache(to_file)) 00753 break; /* purecov: inspected */ 00754 temp=from_file; from_file=to_file; to_file=temp; 00755 *maxbuffer= (int) (lastbuff-buffpek)-1; 00756 } 00757 close_cached_file(to_file); /* This holds old result */ 00758 if (to_file == t_file) 00759 *t_file=t_file2; /* Copy result file */ 00760 00761 DBUG_RETURN(*maxbuffer >= MERGEBUFF2); /* Return 1 if interrupted */ 00762 } /* merge_many_buff */ 00763 00764 00765 /* 00766 Read data to buffer 00767 00768 SYNOPSIS 00769 read_to_buffer() 00770 fromfile File to read from 00771 buffpek Where to read from 00772 sort_length max length to read 00773 RESULT 00774 > 0 Ammount of bytes read 00775 -1 Error 00776 */ 00777 00778 static uint NEAR_F read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek, 00779 uint sort_length) 00780 { 00781 register uint count; 00782 uint length; 00783 00784 if ((count=(uint) min((ha_rows) buffpek->max_keys,buffpek->count))) 00785 { 00786 if (my_pread(fromfile->file,(byte*) buffpek->base, 00787 (length= sort_length*count),buffpek->file_pos,MYF_RW)) 00788 return((uint) -1); /* purecov: inspected */ 00789 buffpek->key=buffpek->base; 00790 buffpek->file_pos+= length; /* New filepos */ 00791 buffpek->count-= count; 00792 buffpek->mem_count= count; 00793 } 00794 return (count*sort_length); 00795 } /* read_to_buffer */ 00796 00797 static uint NEAR_F read_to_buffer_varlen(IO_CACHE *fromfile, BUFFPEK *buffpek, 00798 uint sort_length) 00799 { 00800 register uint count; 00801 uint16 length_of_key = 0; 00802 uint idx; 00803 uchar *buffp; 00804 00805 if ((count=(uint) min((ha_rows) buffpek->max_keys,buffpek->count))) 00806 { 00807 buffp = buffpek->base; 00808 00809 for (idx=1;idx<=count;idx++) 00810 { 00811 if (my_pread(fromfile->file,(byte*)&length_of_key,sizeof(length_of_key), 00812 buffpek->file_pos,MYF_RW)) 00813 return((uint) -1); 00814 buffpek->file_pos+=sizeof(length_of_key); 00815 if (my_pread(fromfile->file,(byte*) buffp,length_of_key, 00816 buffpek->file_pos,MYF_RW)) 00817 return((uint) -1); 00818 buffpek->file_pos+=length_of_key; 00819 buffp = buffp + sort_length; 00820 } 00821 buffpek->key=buffpek->base; 00822 buffpek->count-= count; 00823 buffpek->mem_count= count; 00824 } 00825 return (count*sort_length); 00826 } /* read_to_buffer_varlen */ 00827 00828 00829 static int NEAR_F write_merge_key_varlen(MI_SORT_PARAM *info, 00830 IO_CACHE *to_file,char* key, 00831 uint sort_length, uint count) 00832 { 00833 uint idx; 00834 00835 char *bufs = key; 00836 for (idx=1;idx<=count;idx++) 00837 { 00838 int err; 00839 if ((err= my_var_write(info,to_file, (byte*) bufs))) 00840 return (err); 00841 bufs=bufs+sort_length; 00842 } 00843 return(0); 00844 } 00845 00846 00847 static int NEAR_F write_merge_key(MI_SORT_PARAM *info __attribute__((unused)), 00848 IO_CACHE *to_file, char* key, 00849 uint sort_length, uint count) 00850 { 00851 return my_b_write(to_file,(byte*) key,(uint) sort_length*count); 00852 } 00853 00854 /* 00855 Merge buffers to one buffer 00856 If to_file == 0 then use info->key_write 00857 */ 00858 00859 static int NEAR_F 00860 merge_buffers(MI_SORT_PARAM *info, uint keys, IO_CACHE *from_file, 00861 IO_CACHE *to_file, uchar **sort_keys, BUFFPEK *lastbuff, 00862 BUFFPEK *Fb, BUFFPEK *Tb) 00863 { 00864 int error; 00865 uint sort_length,maxcount; 00866 ha_rows count; 00867 my_off_t to_start_filepos; 00868 uchar *strpos; 00869 BUFFPEK *buffpek,**refpek; 00870 QUEUE queue; 00871 volatile int *killed= killed_ptr(info->sort_info->param); 00872 00873 DBUG_ENTER("merge_buffers"); 00874 00875 count=error=0; 00876 maxcount=keys/((uint) (Tb-Fb) +1); 00877 LINT_INIT(to_start_filepos); 00878 if (to_file) 00879 to_start_filepos=my_b_tell(to_file); 00880 strpos=(uchar*) sort_keys; 00881 sort_length=info->key_length; 00882 00883 if (init_queue(&queue,(uint) (Tb-Fb)+1,offsetof(BUFFPEK,key),0, 00884 (int (*)(void*, byte *,byte*)) info->key_cmp, 00885 (void*) info)) 00886 DBUG_RETURN(1); /* purecov: inspected */ 00887 00888 for (buffpek= Fb ; buffpek <= Tb ; buffpek++) 00889 { 00890 count+= buffpek->count; 00891 buffpek->base= strpos; 00892 buffpek->max_keys=maxcount; 00893 strpos+= (uint) (error=(int) info->read_to_buffer(from_file,buffpek, 00894 sort_length)); 00895 if (error == -1) 00896 goto err; /* purecov: inspected */ 00897 queue_insert(&queue,(char*) buffpek); 00898 } 00899 00900 while (queue.elements > 1) 00901 { 00902 for (;;) 00903 { 00904 if (*killed) 00905 { 00906 error=1; goto err; 00907 } 00908 buffpek=(BUFFPEK*) queue_top(&queue); 00909 if (to_file) 00910 { 00911 if (info->write_key(info,to_file,(byte*) buffpek->key, 00912 (uint) sort_length,1)) 00913 { 00914 error=1; goto err; /* purecov: inspected */ 00915 } 00916 } 00917 else 00918 { 00919 if ((*info->key_write)(info,(void*) buffpek->key)) 00920 { 00921 error=1; goto err; /* purecov: inspected */ 00922 } 00923 } 00924 buffpek->key+=sort_length; 00925 if (! --buffpek->mem_count) 00926 { 00927 if (!(error=(int) info->read_to_buffer(from_file,buffpek,sort_length))) 00928 { 00929 uchar *base=buffpek->base; 00930 uint max_keys=buffpek->max_keys; 00931 00932 VOID(queue_remove(&queue,0)); 00933 00934 /* Put room used by buffer to use in other buffer */ 00935 for (refpek= (BUFFPEK**) &queue_top(&queue); 00936 refpek <= (BUFFPEK**) &queue_end(&queue); 00937 refpek++) 00938 { 00939 buffpek= *refpek; 00940 if (buffpek->base+buffpek->max_keys*sort_length == base) 00941 { 00942 buffpek->max_keys+=max_keys; 00943 break; 00944 } 00945 else if (base+max_keys*sort_length == buffpek->base) 00946 { 00947 buffpek->base=base; 00948 buffpek->max_keys+=max_keys; 00949 break; 00950 } 00951 } 00952 break; /* One buffer have been removed */ 00953 } 00954 } 00955 else if (error == -1) 00956 goto err; /* purecov: inspected */ 00957 queue_replaced(&queue); /* Top element has been replaced */ 00958 } 00959 } 00960 buffpek=(BUFFPEK*) queue_top(&queue); 00961 buffpek->base=(uchar *) sort_keys; 00962 buffpek->max_keys=keys; 00963 do 00964 { 00965 if (to_file) 00966 { 00967 if (info->write_key(info,to_file,(byte*) buffpek->key, 00968 sort_length,buffpek->mem_count)) 00969 { 00970 error=1; goto err; /* purecov: inspected */ 00971 } 00972 } 00973 else 00974 { 00975 register uchar *end; 00976 strpos= buffpek->key; 00977 for (end=strpos+buffpek->mem_count*sort_length; 00978 strpos != end ; 00979 strpos+=sort_length) 00980 { 00981 if ((*info->key_write)(info,(void*) strpos)) 00982 { 00983 error=1; goto err; /* purecov: inspected */ 00984 } 00985 } 00986 } 00987 } 00988 while ((error=(int) info->read_to_buffer(from_file,buffpek,sort_length)) != -1 && 00989 error != 0); 00990 00991 lastbuff->count=count; 00992 if (to_file) 00993 lastbuff->file_pos=to_start_filepos; 00994 err: 00995 delete_queue(&queue); 00996 DBUG_RETURN(error); 00997 } /* merge_buffers */ 00998 00999 01000 /* Do a merge to output-file (save only positions) */ 01001 01002 static int NEAR_F 01003 merge_index(MI_SORT_PARAM *info, uint keys, uchar **sort_keys, 01004 BUFFPEK *buffpek, int maxbuffer, IO_CACHE *tempfile) 01005 { 01006 DBUG_ENTER("merge_index"); 01007 if (merge_buffers(info,keys,tempfile,(IO_CACHE*) 0,sort_keys,buffpek,buffpek, 01008 buffpek+maxbuffer)) 01009 DBUG_RETURN(1); /* purecov: inspected */ 01010 DBUG_RETURN(0); 01011 } /* merge_index */ 01012 01013 static int 01014 flush_ft_buf(MI_SORT_PARAM *info) 01015 { 01016 int err=0; 01017 if (info->sort_info->ft_buf) 01018 { 01019 err=sort_ft_buf_flush(info); 01020 my_free((gptr)info->sort_info->ft_buf, MYF(0)); 01021 info->sort_info->ft_buf=0; 01022 } 01023 return err; 01024 } 01025
1.4.7

