From 2b740ea3cbb32a3fb3906546552503d9d3cca590 Mon Sep 17 00:00:00 2001 From: Mark Wells Date: Sat, 19 May 2012 14:57:29 -0700 Subject: [PATCH 1/1] paged search to conserve memory in CDR processing, #16723 --- FS/FS/ClientAPI/MyAccount.pm | 2 + FS/FS/PagedSearch.pm | 189 +++++++++++++++++++++++++++++++++++++++++ FS/FS/part_pkg/voip_cdr.pm | 22 +++-- FS/FS/part_pkg/voip_inbound.pm | 10 ++- FS/FS/part_pkg/voip_tiered.pm | 19 +++-- FS/FS/svc_pbx.pm | 32 +++++-- FS/FS/svc_phone.pm | 38 +++++++-- 7 files changed, 278 insertions(+), 34 deletions(-) create mode 100644 FS/FS/PagedSearch.pm diff --git a/FS/FS/ClientAPI/MyAccount.pm b/FS/FS/ClientAPI/MyAccount.pm index a07e345f5..e79fbfc0b 100644 --- a/FS/FS/ClientAPI/MyAccount.pm +++ b/FS/FS/ClientAPI/MyAccount.pm @@ -1913,6 +1913,8 @@ sub list_support_usage { sub _list_cdr_usage { # XXX CDR type support... + # XXX any way to do a paged search on this? + # we have to return the results all at once... my($svc_phone, $begin, $end, %opt) = @_; map [ $_->downstream_csv(%opt, 'keeparray' => 1) ], $svc_phone->get_cdrs( 'begin'=>$begin, 'end'=>$end, ); diff --git a/FS/FS/PagedSearch.pm b/FS/FS/PagedSearch.pm new file mode 100644 index 000000000..09d05c4e6 --- /dev/null +++ b/FS/FS/PagedSearch.pm @@ -0,0 +1,189 @@ +package FS::PagedSearch; + +use strict; +use vars qw($DEBUG $default_limit @EXPORT_OK); +use base qw( Exporter ); +use FS::Record qw(qsearch dbdef); +use Data::Dumper; + +$DEBUG = 0; +$default_limit = 100; + +@EXPORT_OK = 'psearch'; + +=head1 NAME + +FS::PagedSearch - Iterator for querying large data sets + +=head1 SYNOPSIS + +use FS::PagedSearch qw(psearch); + +my $search = psearch('table', { field => 'value' ... }); +$search->limit(100); #optional +while ( my $row = $search->fetch ) { +... +} + +=head1 SUBROUTINES + +=over 4 + +=item psearch ARGUMENTS + +A wrapper around L. Accepts all the same arguments +as qsearch, except for the arrayref union query mode, and returns an +FS::PagedSearch object to access the rows of the query one at a time. +If the query doesn't contain an ORDER BY clause already, it will be ordered +by the table's primary key. + +=cut + +sub psearch { + # deep-copy qsearch args + my $q; + if ( ref($_[0]) eq 'ARRAY' ) { + die "union query not supported with psearch"; #yet + } + elsif ( ref($_[0]) eq 'HASH' ) { + %$q = %{ $_[0] }; + } + else { + $q = { + 'table' => shift, + 'hashref' => shift, + 'select' => shift, + 'extra_sql' => shift, + 'cache_obj' => shift, + 'addl_from' => shift, + }; + } + warn Dumper($q) if $DEBUG > 1; + + # clean up query + my $dbdef = dbdef->table($q->{table}); + # qsearch just appends order_by to extra_sql, so do that ourselves + $q->{extra_sql} ||= ''; + $q->{extra_sql} .= ' '.$q->{order_by} if $q->{order_by}; + $q->{order_by} = ''; + # and impose an ordering if needed + if ( not $q->{extra_sql} =~ /order by/i ) { + $q->{extra_sql} .= ' ORDER BY '.$dbdef->primary_key; + } + # and then we'll use order_by for LIMIT/OFFSET + + my $self = { + query => $q, + buffer => [], + offset => 0, + limit => $default_limit, + increment => 1, + }; + bless $self, 'FS::PagedSearch'; + + $self; +} + +=back + +=head1 METHODS + +=over 4 + +=item fetch + +Fetch the next row from the search results and remove it from the buffer. +Returns undef if there are no more rows. + +=cut + +sub fetch { + my $self = shift; + my $b = $self->{buffer}; + $self->refill if @$b == 0; + $self->{offset} += $self->{increment} if @$b; + return shift @$b; +} + +=item adjust ROWS + +Add ROWS to the offset counter. This won't cause rows to be skipped in the +current buffer but will affect the starting point of the next refill. + +=cut + +sub adjust { + my $self = shift; + my $r = shift; + $self->{offset} += $r; +} + +=item limit [ VALUE ] + +Set/get the number of rows to retrieve per page. The default is 100. + +=cut + +sub limit { + my $self = shift; + my $new_limit = shift; + if ( defined($new_limit) ) { + $self->{limit} = $new_limit; + } + $self->{limit}; +} + +=item increment [ VALUE ] + +Set/get the number of rows to increment the offset for each row that's +retrieved. Defaults to 1. If the rows are being modified in a way that +removes them from the result set of the query, it's probably wise to set +this to zero. Setting it to anything else is probably nonsense. + +=cut + +sub increment { + my $self = shift; + my $new_inc = shift; + if ( defined($new_inc) ) { + $self->{increment} = $new_inc; + } + $self->{increment}; +} + + +=item refill + +Run the query, skipping a number of rows set by the row offset, and replace +the contents of the buffer with the result. If there are no more rows, +this will just empty the buffer. Called automatically as needed; don't call +this from outside. + +=cut + +sub refill { + my $self = shift; + my $b = $self->{buffer}; + warn "refilling (limit ".$self->{limit}.", offset ".$self->{offset}.")\n" + if $DEBUG; + warn "discarding ".scalar(@$b)." rows\n" if $DEBUG and @$b; + if ( $self->{limit} > 0 ) { + $self->{query}->{order_by} = 'LIMIT ' . $self->{limit} . + ' OFFSET ' . $self->{offset}; + } + @$b = qsearch( $self->{query} ); + my $rows = scalar @$b; + warn "$rows returned\n" if $DEBUG; + + $rows; +} + +=back + +=head1 SEE ALSO + +L + +=cut + +1; diff --git a/FS/FS/part_pkg/voip_cdr.pm b/FS/FS/part_pkg/voip_cdr.pm index aaad974cf..8c3d80d49 100644 --- a/FS/FS/part_pkg/voip_cdr.pm +++ b/FS/FS/part_pkg/voip_cdr.pm @@ -401,9 +401,10 @@ sub calc_usage { #my @invoice_details_sort; #first rate any outstanding CDRs not yet rated - foreach my $cdr ( - $svc_x->get_cdrs( %options ) - ) { + my $cdr_search = $svc_x->psearch_cdrs(%options); + $cdr_search->limit(1000); + $cdr_search->increment(0); # because we're changing their status as we go + while ( my $cdr = $cdr_search->fetch ) { my $error = $cdr->rate( 'part_pkg' => $self, @@ -414,14 +415,19 @@ sub calc_usage { ); die $error if $error; #?? + $cdr_search->adjust(1) if $cdr->freesidestatus eq ''; + # it was skipped without changing status, so increment the + # offset so that we don't re-fetch it on refill + } # $cdr #then add details to invoices & get a total $options{'status'} = 'rated'; - foreach my $cdr ( - $svc_x->get_cdrs( %options ) - ) { + $cdr_search = $svc_x->psearch_cdrs(%options); + $cdr_search->limit(1000); + $cdr_search->increment(0); + while ( my $cdr = $cdr_search->fetch ) { my $error; # at this point we officially Do Not Care about the rating method if ( $included_calls > 0 ) { @@ -436,7 +442,9 @@ sub calc_usage { } die $error if $error; $formatter->append($cdr); - } + + $cdr_search->adjust(1) if $cdr->freesidestatus eq 'rated'; + } #$cdr } $formatter->finish; #writes into $details diff --git a/FS/FS/part_pkg/voip_inbound.pm b/FS/FS/part_pkg/voip_inbound.pm index f4e51836f..ecc4f47a9 100644 --- a/FS/FS/part_pkg/voip_inbound.pm +++ b/FS/FS/part_pkg/voip_inbound.pm @@ -227,13 +227,15 @@ sub calc_usage { ) { my $svc_phone = $cust_svc->svc_x; - foreach my $cdr ( $svc_phone->get_cdrs( + my $cdr_search = $svc_phone->psearch_cdrs( 'inbound' => 1, 'default_prefix' => $self->option('default_prefix'), 'status' => '', # unprocessed only 'for_update' => 1, - ) - ) { + ); + $cdr_search->limit(1000); + $cdr_search->increment(0); + while ( my $cdr = $cdr_search->fetch ) { my $reason = $self->check_chargable( $cdr, 'option_cache' => \%opt_cache, @@ -310,6 +312,8 @@ sub calc_usage { die $error if $error; $formatter->append($cdr); + $cdr_search->adjust(1) if $cdr->freesidestatus eq ''; + } #$cdr } # $cust_svc # unshift @$details, { format => 'C', diff --git a/FS/FS/part_pkg/voip_tiered.pm b/FS/FS/part_pkg/voip_tiered.pm index e5dcf6dd8..d8d74c13f 100644 --- a/FS/FS/part_pkg/voip_tiered.pm +++ b/FS/FS/part_pkg/voip_tiered.pm @@ -132,9 +132,11 @@ sub calc_usage { $options{'inbound'} = ( $pass eq 'inbound' ); - foreach my $cdr ( - $svc_x->get_cdrs( %options ) - ) { + my $cdr_search = $svc_x->psearch_cdrs(%options); + $cdr_search->limit(1000); + $cdr_search->increment(0); + while ( my $cdr = $cdr_search->fetch ) { + if ( $DEBUG > 1 ) { warn "rating CDR $cdr\n". join('', map { " $_ => ". $cdr->{$_}. "\n" } keys %$cdr ); @@ -173,6 +175,8 @@ sub calc_usage { $total += $charge_min; + $cdr_search->adjust(1) if $cdr->freesidestatus eq ''; + } # $cdr } # $pass @@ -213,9 +217,10 @@ sub calc_usage { # tell the formatter what we're sending it $formatter->inbound($options{'inbound'}); - foreach my $cdr ( - $svc_x->get_cdrs( %options ) - ) { + my $cdr_search = $svc_x->psearch_cdrs(%options); + $cdr_search->limit(1000); + $cdr_search->increment(0); + while ( my $cdr = $cdr_search->fetch ) { my $object = $options{'inbound'} ? $cdr->cdr_termination( 1 ) #1: inbound @@ -242,6 +247,8 @@ sub calc_usage { $formatter->append($cdr); + $cdr_search->adjust(1) if $cdr->freesidestatus eq 'processing-tiered'; + } # $cdr } # $pass diff --git a/FS/FS/svc_pbx.pm b/FS/FS/svc_pbx.pm index f8b96050d..4182a1315 100644 --- a/FS/FS/svc_pbx.pm +++ b/FS/FS/svc_pbx.pm @@ -3,6 +3,7 @@ package FS::svc_pbx; use strict; use base qw( FS::svc_External_Common ); use FS::Record qw( qsearch qsearchs dbh ); +use FS::PagedSearch qw( psearch ); use FS::Conf; use FS::cust_svc; use FS::svc_phone; @@ -259,11 +260,13 @@ sub _check_duplicate { return ''; } -=item get_cdrs +=item psearch_cdrs OPTIONS -Returns a set of Call Detail Records (see L) associated with this -service. By default, "associated with" means that the "charged_party" field of -the CDR matches the "title" field of the service. +Returns a paged search (L) for Call Detail Records +associated with this service. By default, "associated with" means that +the "charged_party" field of the CDR matches the "title" field of the +service. To access the CDRs themselves, call "->fetch" on the resulting +object. =over 2 @@ -295,7 +298,7 @@ to allow title to indicate a range of IP addresses. =cut -sub get_cdrs { +sub psearch_cdrs { my($self, %options) = @_; my %hash = (); my @where = (); @@ -343,15 +346,26 @@ sub get_cdrs { my $extra_sql = ( keys(%hash) ? ' AND ' : ' WHERE ' ). join(' AND ', @where ) if @where; - my @cdrs = - qsearch( { + psearch( { 'table' => 'cdr', 'hashref' => \%hash, 'extra_sql' => $extra_sql, 'order_by' => "ORDER BY startdate $for_update", - } ); + } ); +} + +=item get_cdrs (DEPRECATED) + +Like psearch_cdrs, but returns all the L objects at once, in a +single list. Arguments are the same as for psearch_cdrs. This can take +an unreasonably large amount of memory and is best avoided. - @cdrs; +=cut + +sub get_cdrs { + my $self = shift; + my $psearch = $self->psearch_cdrs($_); + qsearch ( $psearch->{query} ) } =back diff --git a/FS/FS/svc_phone.pm b/FS/FS/svc_phone.pm index b395ea605..1296c1e85 100644 --- a/FS/FS/svc_phone.pm +++ b/FS/FS/svc_phone.pm @@ -7,6 +7,7 @@ use Data::Dumper; use Scalar::Util qw( blessed ); use FS::Conf; use FS::Record qw( qsearch qsearchs dbh ); +use FS::PagedSearch qw( psearch ); use FS::Msgcat qw(gettext); use FS::part_svc; use FS::phone_device; @@ -648,11 +649,13 @@ sub cust_location_or_main { $cust_pkg ? $cust_pkg->cust_location_or_main : ''; } -=item get_cdrs +=item psearch_cdrs OPTIONS -Returns a set of Call Detail Records (see L) associated with this -service. By default, "associated with" means that either the "src" or the -"charged_party" field of the CDR matches the "phonenum" field of the service. +Returns a paged search (L) for Call Detail Records +associated with this service. By default, "associated with" means that +either the "src" or the "charged_party" field of the CDR matches the +"phonenum" field of the service. To access the CDRs themselves, call +"->fetch" on the resulting object. =over 2 @@ -676,11 +679,16 @@ with the chosen prefix. =item by_svcnum: not supported for svc_phone +=item billsec_sum: Instead of returning all of the CDRs, return a single +record (as an L object) with the sum of the 'billsec' field over +the entire result set. + =back =cut -sub get_cdrs { +sub psearch_cdrs { + my($self, %options) = @_; my @fields; my %hash; @@ -739,18 +747,30 @@ sub get_cdrs { my $extra_sql = ( keys(%hash) ? ' AND ' : ' WHERE ' ). join(' AND ', @where ); - my @cdrs = - qsearch( { + psearch( { 'table' => 'cdr', 'hashref' => \%hash, 'extra_sql' => $extra_sql, 'order_by' => $options{'billsec_sum'} ? '' : "ORDER BY startdate $for_update", 'select' => $options{'billsec_sum'} ? 'sum(billsec) as billsec_sum' : '*', - } ); + } ); +} + +=item get_cdrs (DEPRECATED) + +Like psearch_cdrs, but returns all the L objects at once, in a +single list. Arguments are the same as for psearch_cdrs. This can take +an unreasonably large amount of memory and is best avoided. - @cdrs; +=cut + +sub get_cdrs { + my $self = shift; + my $psearch = $self->psearch_cdrs(@_); + qsearch ( $psearch->{query} ) } + =back =head1 BUGS -- 2.11.0