FreeTDS API
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Macros Modules Pages
buffering.h
1 typedef struct dblib_buffer_row {
5  unsigned char *row_data;
7  DBINT row;
9  TDS_INT *sizes;
11 
12 static void buffer_struct_print(const DBPROC_ROWBUF *buf);
13 static RETCODE buffer_save_row(DBPROCESS *dbproc);
14 static DBLIB_BUFFER_ROW* buffer_row_address(const DBPROC_ROWBUF * buf, int idx);
15 
16 #if ENABLE_EXTRA_CHECKS
17 static void buffer_check(const DBPROC_ROWBUF *buf)
18 {
19  int i;
20 
21  /* no buffering */
22  if (buf->capacity == 0 || buf->capacity == 1) {
23  assert(buf->head == 0);
24  assert(buf->tail == 0 || buf->tail == 1);
25  assert(buf->capacity == 1 || buf->rows == NULL);
26  return;
27  }
28 
29  assert(buf->capacity > 0);
30  assert(buf->head >= 0);
31  assert(buf->tail >= 0);
32  assert(buf->head < buf->capacity);
33  assert(buf->tail <= buf->capacity);
34 
35  /* check empty */
36  if (buf->tail == buf->capacity) {
37  assert(buf->head == 0);
38  for (i = 0; buf->rows && i < buf->capacity; ++i) {
39  assert(buf->rows[i].resinfo == NULL);
40  assert(buf->rows[i].row_data == NULL);
41  assert(buf->rows[i].sizes == NULL);
42  assert(buf->rows[i].row == 0);
43  }
44  return;
45  }
46 
47  if (buf->rows == NULL)
48  return;
49 
50  /* check filled part */
51  i = buf->tail;
52  do {
53  assert(i >= 0 && i < buf->capacity);
54  assert(buf->rows[i].resinfo != NULL);
55  assert(buf->rows[i].row > 0);
56  assert(buf->rows[i].row <= buf->received);
57  ++i;
58  if (i == buf->capacity)
59  i = 0;
60  } while (i != buf->head);
61 
62  /* check empty part */
63  if (buf->head != buf->tail) {
64  i = buf->head;
65  do {
66  assert(i >= 0 && i < buf->capacity);
67  assert(buf->rows[i].resinfo == NULL);
68  assert(buf->rows[i].row_data == NULL);
69  assert(buf->rows[i].sizes == NULL);
70  assert(buf->rows[i].row == 0);
71  ++i;
72  if (i == buf->capacity)
73  i = 0;
74  } while (i != buf->tail);
75  }
76 }
77 #define BUFFER_CHECK(buf) buffer_check(buf)
78 #else
79 #define BUFFER_CHECK(buf) do {} while(0)
80 #endif
81 
110 static int
111 buffer_count(const DBPROC_ROWBUF *buf)
112 {
113  BUFFER_CHECK(buf);
114  return (buf->head > buf->tail) ?
115  buf->head - buf->tail : /* |...TddddH....| */
116  buf->capacity - (buf->tail - buf->head); /* |ddddH....Tddd| */
117 }
118 
122 static int
123 buffer_is_full(const DBPROC_ROWBUF *buf)
124 {
125  BUFFER_CHECK(buf);
126  return buf->capacity == buffer_count(buf) && buf->capacity > 1;
127 }
128 
129 #ifndef NDEBUG
130 static int
131 buffer_index_valid(const DBPROC_ROWBUF *buf, int idx)
132 {
133  BUFFER_CHECK(buf);
134  if (buf->tail <= buf->head)
135  if (buf->head <= idx && idx <= buf->tail)
136  return 1;
137 
138  if (0 <= idx && idx <= buf->head)
139  return 1;
140 
141  if (buf->tail <= idx && idx < buf->capacity)
142  return 1;
143 #if 0
144  printf("buffer_index_valid: idx = %d\n", idx);
145  buffer_struct_print(buf);
146 #endif
147  return 0;
148 }
149 #endif
150 
151 static void
152 buffer_free_row(DBLIB_BUFFER_ROW *row)
153 {
154  if (row->sizes)
155  TDS_ZERO_FREE(row->sizes);
156  if (row->row_data) {
157  tds_free_row(row->resinfo, row->row_data);
158  row->row_data = NULL;
159  }
160  tds_free_results(row->resinfo);
161  row->resinfo = NULL;
162  row->row = 0;
163 }
164 
165 /*
166  * Buffer is freed at slightly odd points, whenever
167  * capacity changes:
168  *
169  * 1. When setting capacity, to release prior buffer.
170  * 2. By dbresults. When called the second time, it has to
171  * release prior storage because the new resultset will have
172  * a different width.
173  * 3. By dbclose(), else open/close/open would leak.
174  */
175 static void
176 buffer_free(DBPROC_ROWBUF *buf)
177 {
178  BUFFER_CHECK(buf);
179  if (buf->rows != NULL) {
180  int i;
181  for (i = 0; i < buf->capacity; ++i)
182  buffer_free_row(&buf->rows[i]);
183  TDS_ZERO_FREE(buf->rows);
184  }
185  BUFFER_CHECK(buf);
186 }
187 
188 /*
189  * When no rows are currently buffered (and the buffer is allocated)
190  * set the indices to their initial positions.
191  */
192 static void
193 buffer_reset(DBPROC_ROWBUF *buf)
194 {
195  buf->head = 0;
196  buf->current = buf->tail = buf->capacity;
197  BUFFER_CHECK(buf);
198 }
199 
200 static int
201 buffer_idx_increment(const DBPROC_ROWBUF *buf, int idx)
202 {
203  if (++idx >= buf->capacity) {
204  idx = 0;
205  }
206  return idx;
207 }
208 
213 static DBLIB_BUFFER_ROW*
214 buffer_row_address(const DBPROC_ROWBUF * buf, int idx)
215 {
216  BUFFER_CHECK(buf);
217  if (idx < 0 || idx >= buf->capacity) {
218  printf("idx is %d:\n", idx);
219  buffer_struct_print(buf);
220  return NULL;
221  }
222 
223  return &(buf->rows[idx]);
224 }
225 
229 static DBINT
230 buffer_idx2row(const DBPROC_ROWBUF *buf, int idx)
231 {
232  BUFFER_CHECK(buf);
233  return buffer_row_address(buf, idx)->row;
234 }
235 
239 static int
240 buffer_row2idx(const DBPROC_ROWBUF *buf, int row_number)
241 {
242  int i = buf->tail;
243 #ifndef NDEBUG
244  int ii = 0;
245 #endif
246 
247  BUFFER_CHECK(buf);
248  if (i == buf->capacity) {
249  assert (buf->head == 0);
250  return -1; /* no rows buffered */
251  }
252 
253  /*
254  * March through the buffers from tail to head, stop if we find our row.
255  * A full queue is indicated by tail == head (which means we can't write).
256  */
257  do {
258  if (buffer_idx2row(buf, i) == row_number)
259  return i;
260 
261  assert(ii++ < buf->capacity); /* prevent infinite loop */
262 
263  i = buffer_idx_increment(buf, i);
264  } while (i != buf->head);
265 
266  return -1;
267 }
268 
273 static void
274 buffer_delete_rows(DBPROC_ROWBUF * buf, int count)
275 {
276  int i;
277 
278  BUFFER_CHECK(buf);
279  if (count < 0 || count > buffer_count(buf)) {
280  count = buffer_count(buf);
281  }
282 
283  for (i=0; i < count; i++) {
284  if (buf->tail < buf->capacity)
285  buffer_free_row(&buf->rows[buf->tail]);
286  buf->tail = buffer_idx_increment(buf, buf->tail);
287  /*
288  * If deleting rows from the buffer catches the tail to the head,
289  * return to the initial position. Otherwise, it will look full.
290  */
291  if (buf->tail == buf->head) {
292  buffer_reset(buf);
293  break;
294  }
295  }
296 #if 0
297  buffer_struct_print(buf);
298 #endif
299  BUFFER_CHECK(buf);
300 }
301 
302 static void
303 buffer_transfer_bound_data(DBPROC_ROWBUF *buf, TDS_INT res_type, TDS_INT compute_id, DBPROCESS * dbproc, int idx)
304 {
305  int i;
306  BYTE *src;
307  const DBLIB_BUFFER_ROW *row;
308 
309  tdsdump_log(TDS_DBG_FUNC, "buffer_transfer_bound_data(%p %d %d %p %d)\n", buf, res_type, compute_id, dbproc, idx);
310  BUFFER_CHECK(buf);
311  assert(buffer_index_valid(buf, idx));
312 
313  row = buffer_row_address(buf, idx);
314  assert(row->resinfo);
315 
316  for (i = 0; i < row->resinfo->num_cols; i++) {
317  TDS_SERVER_TYPE srctype;
318  DBINT srclen;
319  TDSCOLUMN *curcol = row->resinfo->columns[i];
320 
321  if (row->sizes)
322  curcol->column_cur_size = row->sizes[i];
323 
324  srclen = curcol->column_cur_size;
325 
326  if (curcol->column_nullbind) {
327  if (srclen < 0) {
328  *(DBINT *)(curcol->column_nullbind) = -1;
329  } else {
330  *(DBINT *)(curcol->column_nullbind) = 0;
331  }
332  }
333  if (!curcol->column_varaddr)
334  continue;
335 
336  if (srclen <= 0) {
337  if (srclen == 0 || !curcol->column_nullbind)
338  dbgetnull(dbproc, curcol->column_bindtype, curcol->column_bindlen,
339  (BYTE *) curcol->column_varaddr);
340  continue;
341  }
342 
343  srctype = tds_get_conversion_type(curcol->column_type, curcol->column_size);
344 
345  if (row->row_data)
346  src = &row->row_data[curcol->column_data - row->resinfo->current_row];
347  else
348  src = curcol->column_data;
349  if (is_blob_col(curcol))
350  src = (BYTE *) ((TDSBLOB *) src)->textvalue;
351 
352  copy_data_to_host_var(dbproc, srctype, src, srclen,
353  (BYTE *) curcol->column_varaddr, curcol->column_bindlen,
354  curcol->column_bindtype, (DBINT*) curcol->column_nullbind);
355  }
356 
357  /*
358  * This function always bumps current. Usually, it's called
359  * by dbnextrow(), so bumping current is a pretty obvious choice.
360  * It can also be called by dbgetrow(), but that function also
361  * causes the bump. If you call dbgetrow() for row N, a subsequent
362  * call to dbnextrow() yields N+1.
363  */
364  buf->current = buffer_idx_increment(buf, buf->current);
365 
366 } /* end buffer_transfer_bound_data() */
367 
368 static void
369 buffer_struct_print(const DBPROC_ROWBUF *buf)
370 {
371  assert(buf);
372 
373  printf("\t%d rows in buffer\n", buffer_count(buf));
374 
375  printf("\thead = %d\t", buf->head);
376  printf("\ttail = %d\t", buf->tail);
377  printf("\tcurrent = %d\n", buf->current);
378  printf("\tcapacity = %d\t", buf->capacity);
379  printf("\thead row number = %d\n", buf->received);
380 }
381 
382 /* * * Functions called only by public db-lib API take DBPROCESS* * */
383 
400 static int
401 buffer_current_index(const DBPROCESS *dbproc)
402 {
403  const DBPROC_ROWBUF *buf = &dbproc->row_buf;
404 #if 0
405  buffer_struct_print(buf);
406 #endif
407  if (buf->capacity <= 1) /* no buffering */
408  return -1;
409  if (buf->current == buf->head || buf->current == buf->capacity)
410  return -1;
411 
412  assert(buf->current >= 0);
413  assert(buf->current < buf->capacity);
414 
415  if( buf->tail < buf->head) {
416  assert(buf->tail < buf->current);
417  assert(buf->current < buf->head);
418  } else {
419  if (buf->current > buf->head)
420  assert(buf->current > buf->tail);
421  }
422  return buf->current;
423 }
424 
425 /*
426  * Normally called by dbsetopt() to prepare for buffering
427  * Called with nrows == 0 by dbopen to safely set buf->rows to NULL.
428  */
429 static void
430 buffer_set_capacity(DBPROCESS *dbproc, int nrows)
431 {
432  DBPROC_ROWBUF *buf = &dbproc->row_buf;
433 
434  buffer_free(buf);
435 
436  memset(buf, 0, sizeof(DBPROC_ROWBUF));
437 
438  if (0 == nrows) {
439  buf->capacity = 1;
440  BUFFER_CHECK(buf);
441  return;
442  }
443 
444  assert(0 < nrows);
445 
446  buf->capacity = nrows;
447  BUFFER_CHECK(buf);
448 }
449 
450 /*
451  * Called only by dbresults(); capacity must be >= 1.
452  * Sybase's documents say dbresults() cannot return FAIL if the prior calls worked,
453  * which is a little strange, because (for FreeTDS, at least), dbresults
454  * is when we learn about the result set's width. Without that information, we
455  * can't allocate memory for the buffer. But if we *fail* to allocate memory,
456  * we're not to communicate it back to the caller?
457  */
458 static void
459 buffer_alloc(DBPROCESS *dbproc)
460 {
461  DBPROC_ROWBUF *buf = &dbproc->row_buf;
462 
463  /* Call this function only after setting capacity. */
464 
465  assert(buf);
466  assert(buf->capacity > 0);
467  assert(buf->rows == NULL);
468 
469  buf->rows = tds_new0(DBLIB_BUFFER_ROW, buf->capacity);
470 
471  assert(buf->rows);
472 
473  buffer_reset(buf);
474 
475  buf->received = 0;
476 }
477 
482 static int
483 buffer_add_row(DBPROCESS *dbproc, TDSRESULTINFO *resinfo)
484 {
485  DBPROC_ROWBUF *buf = &dbproc->row_buf;
486  DBLIB_BUFFER_ROW *row;
487  int i;
488 
489  assert(buf->capacity >= 0);
490 
491  if (buffer_is_full(buf))
492  return -1;
493 
494  row = buffer_row_address(buf, buf->head);
495 
496  /* bump the row number, write it, and move the data to head */
497  if (row->resinfo) {
498  tds_free_row(row->resinfo, row->row_data);
499  tds_free_results(row->resinfo);
500  }
501  row->row = ++buf->received;
502  ++resinfo->ref_count;
503  row->resinfo = resinfo;
504  row->row_data = NULL;
505  if (row->sizes)
506  free(row->sizes);
507  row->sizes = tds_new0(TDS_INT, resinfo->num_cols);
508  for (i = 0; i < resinfo->num_cols; ++i)
509  row->sizes[i] = resinfo->columns[i]->column_cur_size;
510 
511  /* initial condition is head == 0 and tail == capacity */
512  if (buf->tail == buf->capacity) {
513  /* bumping this tail will set it to zero */
514  assert(buf->head == 0);
515  buf->tail = 0;
516  }
517 
518  /* update current, bump the head */
519  buf->current = buf->head;
520  buf->head = buffer_idx_increment(buf, buf->head);
521 
522  return buf->current;
523 }
524 
525 static RETCODE
526 buffer_save_row(DBPROCESS *dbproc)
527 {
528  DBPROC_ROWBUF *buf = &dbproc->row_buf;
529  DBLIB_BUFFER_ROW *row;
530  int idx = buf->head - 1;
531 
532  if (buf->capacity <= 1)
533  return SUCCEED;
534 
535  if (idx < 0)
536  idx = buf->capacity - 1;
537  if (idx >= 0 && idx < buf->capacity) {
538  row = &buf->rows[idx];
539 
540  if (row->resinfo && !row->row_data) {
541  row->row_data = row->resinfo->current_row;
542  tds_alloc_row(row->resinfo);
543  }
544  }
545 
546  return SUCCEED;
547 }
548 
DBINT row
row number
Definition: buffering.h:7
Hold information for any results.
Definition: tds.h:798
RETCODE dbgetnull(DBPROCESS *dbproc, int bindtype, int varlen, BYTE *varaddr)
Definition: dblib.c:539
Definition: dblib.h:49
TDSRET tds_alloc_row(TDSRESULTINFO *res_info)
Allocate space for row store return NULL on out of memory.
Definition: mem.c:512
TDS_INT column_cur_size
size written in variable (ie: char, text, binary).
Definition: tds.h:768
TDS_INT column_size
maximun size of data.
Definition: tds.h:727
TDS_SERVER_TYPE tds_get_conversion_type(TDS_SERVER_TYPE srctype, int colsize)
Return type suitable for conversions (convert all nullable types to fixed type)
Definition: tds_types.h:125
Information about blobs (e.g.
Definition: tds.h:618
unsigned char * row_data
row data, NULL for resinfo->current_row
Definition: buffering.h:5
TDS_INT * sizes
save old sizes
Definition: buffering.h:9
Metadata about columns in regular and compute rows.
Definition: tds.h:721
TDS_SERVER_TYPE column_type
This type can be different from wire type because conversion (e.g.
Definition: tds.h:729
TDSRESULTINFO * resinfo
pointer to result informations
Definition: buffering.h:3
Definition: dblib.h:120
Definition: buffering.h:1
void tdsdump_log(const char *file, unsigned int level_line, const char *fmt,...)
Write a message to the debug log.
Definition: log.c:353