voip.ms CDR import, #31835
authorMark Wells <mark@freeside.biz>
Wed, 14 Jan 2015 06:12:25 +0000 (22:12 -0800)
committerMark Wells <mark@freeside.biz>
Wed, 14 Jan 2015 06:12:33 +0000 (22:12 -0800)
FS/FS/Misc/Getopt.pm [new file with mode: 0644]
FS/FS/Record.pm
FS/FS/part_export/voip_ms.pm
FS/FS/part_pkg/voip_cdr.pm
FS/FS/svc_acct.pm
bin/cdr-voip_ms.import [new file with mode: 0755]

diff --git a/FS/FS/Misc/Getopt.pm b/FS/FS/Misc/Getopt.pm
new file mode 100644 (file)
index 0000000..973295b
--- /dev/null
@@ -0,0 +1,112 @@
+package FS::Getopt;
+
+=head1 NAME
+
+FS::Getopt - Getopt::Std for Freeside command line/cron scripts
+
+=head1 SYNOPSIS
+
+#!/usr/bin/perl
+
+use FS::Getopt;
+use FS::other_stuff;
+our %opt;
+
+getopts('AB');
+
+print "Option A: $opt{A}
+Option B: $opt{B}
+Start date: $opt{start}
+End date: $opt{end}
+Freeside user: $opt{user}
+Verbose mode: $DEBUG
+";
+
+=head1 DESCRIPTION
+
+This module provides a wrapper around Getopt::Std::getopts() that 
+automatically processes certain common command line options, and sets
+up a convenient environment for writing a script.
+
+Options will go into %main::opt, as if you had called getopts(..., \%opt).
+All options recognized by the wrapper use (and will always use) lowercase 
+letters as flags, so it's safe for a script to define its options as
+capital letters.
+
+Options recognized by the wrapper do not need to be included in the string
+argument to getopts().
+
+The following command line options are recognized:
+
+=over 4
+
+=item -v: Verbose mode. Sets $main::DEBUG.
+
+=item -s: Start date. If provided, FS::Getopt will parse it as a date 
+and set $opt{start} to the resulting Unix timestamp value. If parsing fails, 
+displays an error and exits.
+
+=item -e: End date. As for -s; sets $opt{end}.
+
+=back
+
+Calling getopts() also performs some additional setup: 
+
+=over 4
+
+=item Exports a function named &main::debug, which performs a warn() if 
+$DEBUG has a true value, and if not, does nothing. This should be used to
+output informational messages. (warn() is for warnings.)
+
+=item Captures the first command line argument after any switches and 
+sets $opt{user} to that value. If a value isn't provided, prints an error
+and exits.
+
+=item Loads L<FS::UID> and calls adminsuidsetup() to connect to the database.
+
+=back
+
+=cut
+
+use strict;
+use base 'Exporter';
+use Getopt::Std ();
+use FS::UID qw(adminsuidsetup);
+use FS::Misc::DateTime qw(parse_datetime day_end);
+
+our @EXPORT = qw( getopts debug );
+
+sub getopts {
+  my $optstring = shift;
+  my %opt;
+  $optstring .= 's:e:v';
+
+  Getopt::Std::getopts($optstring, \%opt);
+
+  $opt{user} = shift(@ARGV)
+    or die "Freeside username required.\n";
+  adminsuidsetup($opt{user})
+    or die "Failed to connect as user '$opt{user}'.\n";
+
+  # now we have config access
+  if ( $opt{s} ) {
+    $opt{start} = parse_datetime($opt{s})
+      or die "Unable to parse start date '$opt{s}'.\n";
+  }
+  if ( $opt{e} ) {
+    $opt{end} = parse_datetime($opt{e})
+      or die "Unable to parse start date '$opt{e}'.\n";
+    $opt{end} = day_end($opt{end});
+  }
+  if ( $opt{v} ) {
+    $main::DEBUG ||= $opt{v};
+  }
+
+  %main::opt = %opt;
+}
+
+sub debug {
+  warn(@_, "\n") if $main::DEBUG;
+}
+
+1;
index f2236b0..5494053 100644 (file)
@@ -3190,6 +3190,22 @@ sub count {
   $self->scalar_sql($sql, @_);
 }
 
+=item row_exists [ WHERE [, PLACEHOLDER ...] ]
+
+Convenience method for the common case of "SELECT 1 FROM table ... LIMIT 1"
+with optional (but almost always needed) WHERE.
+
+=cut
+
+sub row_exists {
+  my($self, $where) = (shift, shift);
+  my $table = $self->table or die 'row_exists called on object of class '.ref($self);
+  my $sql = "SELECT 1 FROM $table";
+  $sql .= " WHERE $where" if $where;
+  $sql .= " LIMIT 1";
+  $self->scalar_sql($sql, @_);
+}
+
 =back
 
 =head1 SUBROUTINES
index 53a4926..7766eac 100644 (file)
@@ -10,9 +10,14 @@ use URI::Escape;
 use JSON;
 use HTTP::Request::Common;
 use Cache::FileCache;
+use FS::Record qw(dbh);
+use FS::Misc::DateTime qw(parse_datetime);
+use DateTime;
 
 our $me = '[voip.ms]';
-our $DEBUG = 2;
+our $DEBUG = 0;
+# our $DEBUG = 1; # log requests
+# our $DEBUG = 2; # log requests and content of replies
 our $base_url = 'https://voip.ms/api/v1/rest.php';
 
 # cache cities and provinces
@@ -222,6 +227,9 @@ sub export_unsuspend {
   '';
 }
 
+################
+# PROVISIONING #
+################
 
 sub insert_subacct {
   my ($self, $svc_acct) = @_;
@@ -587,6 +595,142 @@ sub reload_cache {
   }
 }
 
+################
+# CALL DETAILS #
+################
+
+=item import_cdrs START, END
+
+Retrieves CDRs for calls in the date range from START to END and inserts them
+as a new CDR batch. On success, returns a new cdr_batch object. On failure,
+returns an error message. If there are no new CDRs, returns nothing.
+
+=cut
+
+sub import_cdrs {
+  my ($self, $start, $end) = @_;
+  $start ||= 0; # all CDRs ever
+  $end ||= time;
+  $DEBUG ||= $self->option('debug');
+
+  my $oldAutoCommit = $FS::UID::AutoCommit;
+  local $FS::UID::AutoCommit = 0;
+
+  ($start, $end) = ($end, $start) if $end < $start;
+  $start = DateTime->from_epoch(epoch => $start, time_zone => 'local');
+  $end = DateTime->from_epoch(epoch => $end, time_zone => 'local');
+  my $accountnum = $self->option('account');
+  my $cdr_batch;
+  # can't retrieve more than 92 days at a time
+  # actually, it's even less than that; on large batches their server
+  # sometimes cuts off in mid-sentence. so set the chunk size smaller.
+  while ( $start < $end ) {
+
+    my $this_end = $start->clone;
+    $this_end->add(days => 14);
+    if ($this_end > $end) {
+      $this_end = $end;
+    }
+
+    my $date_from = $start->strftime('%F');
+    my $date_to = $this_end->strftime('%F');
+    warn "retrieving CDRs from $date_from to $date_to\n" if $DEBUG;
+    my $timezone = $start->strftime('%z') / 100; # integer number of hours
+    my $result = $self->api_request('getCDR', {
+        date_from => $date_from,
+        date_to   => $date_to,
+        answered  => 1,
+        noanswer  => 1,
+        busy      => 1,
+        failed    => 1,
+        timezone  => $timezone,
+    });
+    if ( $result->{status} eq 'success' ) {
+      if (!$cdr_batch) {
+        # then create one
+        my $cdrbatchname = 'voip_ms-' . $self->exportnum . '-' . $end->epoch;
+        $cdr_batch = FS::cdr_batch->new({ cdrbatch => $cdrbatchname });
+        my $error = $cdr_batch->insert;
+        if ( $error ) {
+          dbh->rollback if $oldAutoCommit;
+          return $error;
+        }
+      }
+
+      foreach ( @{ $result->{cdr} } ) {
+        my $uniqueid = $_->{uniqueid};
+        # download ranges may overlap; avoid double-importing CDRs
+        if ( FS::cdr->row_exists("uniqueid = ?", $uniqueid) ) {
+          warn "skipped call with uniqueid = '$uniqueid' (already imported)\n"
+            if $DEBUG;
+          next;
+        }
+        # in this case, and probably in other cases in the near future,
+        # easier to do this than to create a FS::cdr::* format module
+        my $hash = {
+          disposition             => $_->{disposition},
+          calldate                => $_->{date},
+          dst                     => $_->{destination},
+          uniqueid                => $_->{uniqueid},
+          upstream_price          => $_->{total},
+          upstream_dst_regionname => $_->{description},
+          clid                    => $_->{callerid},
+          duration                => $_->{seconds},
+          billsec                 => $_->{seconds},
+          cdrbatchnum             => $cdr_batch->cdrbatchnum,
+        };
+        if ( $_->{date} ) {
+          $hash->{startdate} = parse_datetime($_->{date});
+        }
+        if ( $_->{account} eq $accountnum ) {
+          # calls made from the master account, not a subaccount
+          # charged_party will be set to the source number
+          $hash->{charged_party} = '';
+        } elsif ( $_->{account} =~ /^${accountnum}_(\w+)$/ ) {
+          $hash->{charged_party} = $1;
+        } else {
+          warn "skipped call with account = '$_->{account}'\n";
+          next;
+        }
+        if ( $_->{callerid} =~ /<(\w+)>$/ ) {
+          $hash->{src} = $1;
+        } elsif ( $_->{callerid} =~ /^(\w+)$/ ) {
+          $hash->{src} = $1;
+        } else {
+          # else what? they don't have a source number anywhere else
+          warn "skipped call with unparseable callerid '$_->{callerid}'\n";
+          next;
+        }
+
+        my $cdr = FS::cdr->new($hash);
+        my $error = $cdr->insert;
+        if ( $error ) {
+          dbh->rollback if $oldAutoCommit;
+          return "$error (uniqueid $_->{uniqueid})";
+        }
+      } # foreach @{ $result->{cdr} }
+
+    } elsif ( $result->{status} eq 'no_cdr' ) {
+      # normal result if there are no CDRs, duh
+      next; # there may still be more CDRs later
+    } else {
+      dbh->rollback if $oldAutoCommit;
+      return "$me error retrieving CDRs: $result->{status}";
+    }
+
+    # we've retrieved and inserted this sub-batch of CDRs
+    $start->add(days => 15);
+  } # while ( $start < $end )
+
+  if ( $cdr_batch ) {
+    dbh->commit if $oldAutoCommit;
+    return $cdr_batch;
+  } else {
+    # no CDRs were ever found
+    return;
+  }
+}
+
 ##############
 # API ACCESS #
 ##############
@@ -614,15 +758,21 @@ sub api_request {
     'Accept'        => 'text/json',
   );
 
-  warn "$me $method\n" . $request->as_string ."\n" if $DEBUG;
+  warn "$me $method\n" if $DEBUG;
+  warn $request->as_string ."\n" if $DEBUG > 1;
   my $ua = LWP::UserAgent->new;
   my $response = $ua->request($request);
-  warn "$me received\n" . $response->as_string ."\n" if $DEBUG;
+  warn "$me received\n" . $response->as_string ."\n" if $DEBUG > 1;
   if ( !$response->is_success ) {
     return { status => $response->content };
   }
 
-  return decode_json($response->content);
+  local $@;
+  my $decoded_response = eval { decode_json($response->content) };
+  if ( $@ ) {
+    die "Error parsing response:\n" . $response->content . "\n\n";
+  }
+  return $decoded_response;
 }
 
 =item api_insist METHOD, CONTENT
index 3791e56..89cb3de 100644 (file)
@@ -22,6 +22,7 @@ tie my %cdr_svc_method, 'Tie::IxHash',
   'svc_pbx.svcnum'     => 'Freeside service # (svc_pbx.svcnum)',
   'svc_pbx.ip.src'     => 'PBX name to source IP address',
   'svc_pbx.ip.dst'     => 'PBX name to destination IP address',
+  'svc_acct.username'  => 'Username (svc_acct.username)',
 ;
 
 tie my %rating_method, 'Tie::IxHash',
@@ -466,16 +467,20 @@ sub calc_usage {
     #my @invoice_details_sort;
 
     # for tagging invoice details
+    # (unfortunate; should be a svc_x class method or table_info item or 
+    # something)
     my $phonenum;
     if ( $svc_table eq 'svc_phone' ) {
       $phonenum = $svc_x->phonenum;
     } elsif ( $svc_table eq 'svc_pbx' ) {
       $phonenum = $svc_x->title;
+    } elsif ( $svc_table eq 'svc_acct' ) {
+      $phonenum = $svc_x->username;
     }
     $formatter->phonenum($phonenum);
 
     #first rate any outstanding CDRs not yet rated
-    # XXX eventually use an FS::Cursor for this
+    # use FS::Cursor for this starting in 4.x
     my $cdr_search = $svc_x->psearch_cdrs(%options);
     $cdr_search->limit(1000);
     $cdr_search->increment(0); # because we're changing their status as we go
index 354dd3a..ab86915 100644 (file)
@@ -40,6 +40,7 @@ use FS::Record qw( qsearch qsearchs fields dbh dbdef );
 use FS::Msgcat qw(gettext);
 use FS::UI::bytecount;
 use FS::UI::Web;
+use FS::PagedSearch qw( psearch ); # XXX in v4, replace with FS::Cursor
 use FS::part_pkg;
 use FS::part_svc;
 use FS::svc_acct_pop;
@@ -2380,65 +2381,94 @@ sub last_login_text {
   $self->last_login ? ctime($self->last_login) : 'unknown';
 }
 
-=item get_cdrs TIMESTAMP_START TIMESTAMP_END [ 'OPTION' => 'VALUE ... ]
+=item psearch_cdrs OPTIONS
+
+Returns a paged search (L<FS::PagedSearch>) for Call Detail Records
+associated with this service. For svc_acct, "associated with" means that
+either the "src" or the "charged_party" field of the CDR matches the
+"username" field of the service.
 
 =cut
 
-sub get_cdrs {
-  my($self, $start, $end, %opt ) = @_;
-
-  my $did = $self->username; #yup
-
-  my $prefix = $opt{'default_prefix'}; #convergent.au '+61'
-
-  my $for_update = $opt{'for_update'} ? 'FOR UPDATE' : '';
-
-  #SELECT $for_update * FROM cdr
-  #  WHERE calldate >= $start #need a conversion
-  #    AND calldate <  $end   #ditto
-  #    AND (    charged_party = "$did"
-  #          OR charged_party = "$prefix$did" #if length($prefix);
-  #          OR ( ( charged_party IS NULL OR charged_party = '' )
-  #               AND
-  #               ( src = "$did" OR src = "$prefix$did" ) # if length($prefix)
-  #             )
-  #        )
-  #    AND ( freesidestatus IS NULL OR freesidestatus = '' )
-
-  my $charged_or_src;
-  if ( length($prefix) ) {
-    $charged_or_src =
-      " AND (    charged_party = '$did' 
-              OR charged_party = '$prefix$did'
-              OR ( ( charged_party IS NULL OR charged_party = '' )
-                   AND
-                   ( src = '$did' OR src = '$prefix$did' )
-                 )
-            )
-      ";
-  } else {
-    $charged_or_src = 
-      " AND (    charged_party = '$did' 
-              OR ( ( charged_party IS NULL OR charged_party = '' )
-                   AND
-                   src = '$did'
-                 )
-            )
-      ";
+sub psearch_cdrs {
+  my($self, %options) = @_;
+  my @fields;
+  my %hash;
+  my @where;
+
+  my $did = dbh->quote($self->username);
+
+  my $prefix = $options{'default_prefix'} || ''; #convergent.au '+61'
+  my $prefixdid = dbh->quote($prefix . $self->username);
+
+  my $for_update = $options{'for_update'} ? 'FOR UPDATE' : '';
 
+  if ( $options{inbound} ) {
+    # these will be selected under their DIDs
+    push @where, "FALSE";
+  }
+
+  my @orwhere;
+  if (!$options{'disable_charged_party'}) {
+    push @orwhere,
+      "charged_party = $did",
+      "charged_party = $prefixdid";
+  }
+  if (!$options{'disable_src'}) {
+    push @orwhere,
+      "src = $did AND charged_party IS NULL",
+      "src = $prefixdid AND charged_party IS NULL";
+  }
+  push @where, '(' . join(' OR ', @orwhere) . ')';
+
+  # $options{'status'} = '' is meaningful; for the rest of them it's not
+  if ( exists $options{'status'} ) {
+    $hash{'freesidestatus'} = $options{'status'};
+  }
+  if ( $options{'cdrtypenum'} ) {
+    $hash{'cdrtypenum'} = $options{'cdrtypenum'};
   }
+  if ( $options{'calltypenum'} ) {
+    $hash{'calltypenum'} = $options{'calltypenum'};
+  }
+  if ( $options{'begin'} ) {
+    push @where, 'startdate >= '. $options{'begin'};
+  } 
+  if ( $options{'end'} ) {
+    push @where, 'startdate < '.  $options{'end'};
+  } 
+  if ( $options{'nonzero'} ) {
+    push @where, 'duration > 0';
+  } 
 
-  qsearch(
-    'select'    => "$for_update *",
+  my $extra_sql = join(' AND ', @where);
+  if ($extra_sql) {
+    if (keys %hash) {
+      $extra_sql = " AND ".$extra_sql;
+    } else {
+      $extra_sql = " WHERE ".$extra_sql;
+    }
+  }
+  return psearch({
+    'select'    => '*',
     'table'     => 'cdr',
-    'hashref'   => {
-                     #( freesidestatus IS NULL OR freesidestatus = '' )
-                     'freesidestatus' => '',
-                   },
-    'extra_sql' => $charged_or_src,
+    'hashref'   => \%hash,
+    'extra_sql' => $extra_sql,
+    'order_by'  => "ORDER BY startdate $for_update",
+  });
+}
 
-  );
+=item get_cdrs (DEPRECATED)
+
+Like psearch_cdrs, but returns all the L<FS::cdr> objects at once, in a 
+single list. Arguments are the same as for psearch_cdrs.
+
+=cut
 
+sub get_cdrs {
+  my $self = shift;
+  my $psearch = $self->psearch_cdrs(@_);
+  qsearch ( $psearch->{query} )
 }
 
 # sub radius_groups has moved to svc_Radius_Mixin
diff --git a/bin/cdr-voip_ms.import b/bin/cdr-voip_ms.import
new file mode 100755 (executable)
index 0000000..1b4805d
--- /dev/null
@@ -0,0 +1,70 @@
+#!/usr/bin/perl
+
+use strict;
+use FS::Misc::Getopt;
+use FS::cdr_batch;
+use FS::part_export;
+use FS::Record qw(qsearch qsearchs dbh);
+use Date::Format 'time2str';
+
+###
+# parse command line
+###
+
+our %opt;
+getopts('');
+
+$FS::UID::AutoCommit = 0;
+
+my @exports = qsearch('part_export', { exporttype => 'voip_ms' });
+if (!@exports) {
+  die "There are no voip.ms exports configured.\n";
+}
+
+foreach my $part_export (@exports) {
+  debug "Account #".$part_export->option('account');
+
+  if (!$opt{start}) {
+    # find the most recently downloaded batch
+    my $exportnum = $part_export->exportnum;
+    my $most_recent = qsearchs({
+        'table'     => 'cdr_batch',
+        'hashref'   => { 'cdrbatch' => {op=>'like',
+                                        value=>'voip_ms-' . $exportnum . '-%'}
+                       },
+        'order_by'  => 'ORDER BY _date DESC LIMIT 1',
+    });
+    if ( $most_recent ) {
+      $most_recent->cdrbatch =~ /-(\d+)$/; # extract the end timestamp
+      $opt{start} = $1;
+      debug "Downloading records since most recent batch: ".
+            time2str('%Y-%m-%d', $opt{start});
+    } else {
+      $opt{start} = 0;
+      debug "Downloading records since time = zero.";
+    }
+  }
+
+  $opt{end} ||= time;
+
+  my $error_or_batch = $part_export->import_cdrs( $opt{start}, $opt{end} );
+  if ( ref $error_or_batch ) {
+    debug "Created batch #".$error_or_batch->cdrbatchnum;
+    dbh->commit;
+  } elsif ( $error_or_batch ) {
+    warn $error_or_batch;
+    dbh->rollback;
+  } else {
+    debug "No CDRs found."
+  }
+}
+
+sub usage {
+  "Usage: \n  cdr-voip_ms.import [ options ] user
+  Options:
+    -v: be verbose
+    -s date: start date (defaults to the most recent batch date)
+    -e date: end date
+";
+}
+