3af3c1bffbd427a4636215c7a00d225dd4ed6c67
[freeside.git] / FS / FS / Cursor.pm
1 package FS::Cursor;
2
3 use strict;
4 use vars qw($DEBUG $buffer);
5 use FS::Record;
6 use FS::UID qw(myconnect driver_name);
7 use Scalar::Util qw(refaddr);
8
9 $DEBUG = 2;
10
11 # this might become a parameter at some point, but right now, you can
12 # "local $FS::Cursor::buffer = X;"
13 $buffer = 200;
14
15 =head1 NAME
16
17 FS::Cursor - Iterator for querying large data sets
18
19 =head1 SYNOPSIS
20
21 use FS::Cursor;
22
23 my $search = FS::Cursor->new('table', { field => 'value' ... });
24 while ( my $row = $search->fetch ) {
25 ...
26 }
27
28 =head1 CLASS METHODS
29
30 =over 4
31
32 =item new ARGUMENTS
33
34 Constructs a cursored search.  Accepts all the same arguments as qsearch,
35 and returns an FS::Cursor object to fetch the rows one at a time.
36
37 =cut
38
39 sub new {
40   my $class = shift;
41   my $q = FS::Record::_query(@_); # builds the statement and parameter list
42   my $dbh = myconnect();
43
44   my $self = {
45     query => $q,
46     class => 'FS::' . ($q->{table} || 'Record'),
47     buffer => [],
48     dbh   => $dbh,
49     position => 0, # for mysql
50   };
51   bless $self, $class;
52
53   # the class of record object to return
54   $self->{class} = "FS::".($q->{table} || 'Record');
55
56   # save for later, so forked children will not destroy me when they exit
57   $self->{pid} = $$;
58
59   $self->{id} = sprintf('cursor%08x', refaddr($self));
60
61   my $statement;
62   if ( driver_name() eq 'Pg' ) {
63     $statement = "DECLARE ".$self->{id}." CURSOR FOR ".$q->{statement};
64   } elsif ( driver_name() eq 'mysql' ) {
65     # build a cursor from scratch
66     $statement = "CREATE TEMPORARY TABLE $self->{id} 
67       (rownum INT AUTO_INCREMENT, PRIMARY KEY (rownum))
68       $q->{statement}";
69   }
70
71   my $sth = $dbh->prepare($statement)
72     or die $dbh->errstr;
73   my $bind = 1;
74   foreach my $value ( @{ $q->{value} } ) {
75     my $bind_type = shift @{ $q->{bind_type} };
76     $sth->bind_param($bind++, $value, $bind_type );
77   }
78
79   $sth->execute or die $sth->errstr;
80   # in mysql, make sure we're not holding any locks on the tables mentioned
81   # in the query; in Pg this will do nothing.
82   $dbh->commit;
83
84   if ( driver_name() eq 'Pg' ) {
85     $self->{fetch} = $dbh->prepare("FETCH FORWARD $buffer FROM ".$self->{id});
86   } elsif ( driver_name() eq 'mysql' ) {
87     $self->{fetch} = $dbh->prepare("SELECT * FROM $self->{id} ORDER BY rownum LIMIT ?, $buffer");
88   }
89
90   $self;
91 }
92
93 =back
94
95 =head1 METHODS
96
97 =over 4
98
99 =item fetch
100
101 Fetch the next row from the search results.
102
103 =cut
104
105 sub fetch {
106   # might be a little more efficient to do a FETCH NEXT 1000 or something
107   # and buffer them locally, but the semantics are simpler this way
108   my $self = shift;
109   if (@{ $self->{buffer} } == 0) {
110     my $rows = $self->refill;
111     return undef if !$rows;
112   }
113   $self->{class}->new(shift @{ $self->{buffer} });
114 }
115
116 sub refill {
117   my $self = shift;
118   my $sth = $self->{fetch};
119   $sth->bind_param(1, $self->{position}) if driver_name() eq 'mysql';
120   $sth->execute or die $sth->errstr;
121   my $result = $self->{fetch}->fetchall_arrayref( {} );
122   $self->{buffer} = $result;
123   $self->{position} += $sth->rows;
124   scalar @$result;
125 }
126
127 sub DESTROY {
128   my $self = shift;
129   return unless $self->{pid} eq $$;
130   if ( driver_name() eq 'Pg' ) {
131     $self->{dbh}->do('CLOSE '. $self->{id})
132       or die $self->{dbh}->errstr; # clean-up the cursor in Pg
133   } elsif ( driver_name() eq 'mysql' ) {
134     # nothing; the temporary table will evaporate when the 
135     # session closes.
136   }
137   $self->{dbh}->rollback;
138   $self->{dbh}->disconnect;
139 }
140
141 =back
142
143 =head1 TO DO
144
145 Replace all uses of qsearch with this.
146
147 =head1 BUGS
148
149 MySQL doesn't support cursors in interactive sessions, only in stored 
150 procedures, so we implement our own.  This has not been extensively tested.
151
152 The cursor will close prematurely if any code issues a rollback/commit. If
153 you need protection against this use qsearch or fork and get a new dbh
154 handle.
155 Normally this issue will represent itself this message.
156 ERROR: cursor "cursorXXXXXXX" does not exist.
157
158 =head1 SEE ALSO
159
160 L<FS::Record>
161
162 =cut
163
164 1;