Tripal v1.0 (6.x-1.0)
tripal_bulk_loader.loader.inc
Go to the documentation of this file.
00001 <?php
00002 
00014 function tripal_bulk_loader_add_loader_job_form($form_state, $node) {
00015   $form = array();
00016 
00017   // --notify--
00018   if ($node->job_status == 'Loading...') {
00019     drupal_set_message(t("The Loading Summary only updates at the end of each constant set.
00020       Although records may have already been inserted, they won't be available until the
00021       current constant set is full loaded and no errors are encountered.", array()), 'warning');
00022   }
00023 
00024   $form['nid'] = array(
00025     '#type' => 'hidden',
00026     '#value' => $node->nid,
00027   );
00028 
00029   $form['file'] = array(
00030     '#type' => 'hidden',
00031     '#value' => $node->file
00032   );
00033 
00034   $form['job_id'] = array(
00035     '#type' => 'hidden',
00036     '#value' => $node->job_id,
00037   );
00038 
00039   $form['submit'] = array(
00040     '#type' => 'submit',
00041     '#value' => ($node->job_id) ? 'Re-Submit Job' : 'Submit Job',
00042   );
00043 
00044   $form['submit-cancel'] = array(
00045     '#type' => ($node->job_id)? 'submit' : 'hidden',
00046     '#value' => 'Cancel Job',
00047   );
00048 
00049   if ($node->keep_track_inserted) {
00050     $form['submit-revert'] = array(
00051       '#type' => ($node->job_id) ? 'submit' : 'hidden',
00052       '#value' => 'Revert',
00053     );
00054   }
00055 
00056   return $form;
00057 }
00058 
00062 function tripal_bulk_loader_add_loader_job_form_submit($form, $form_state) {
00063   global $user;
00064 
00065   if (preg_match('/Submit Job/', $form_state['values']['op'])) {
00066     //Submit Tripal Job
00067     $job_args[1] = $form_state['values']['nid'];
00068     if (is_readable($form_state['values']['file'])) {
00069       $fname = basename($form_state['values']['file']);
00070       $job_id = tripal_add_job("Bulk Loading Job: $fname", 'tripal_bulk_loader', 'tripal_bulk_loader_load_data', $job_args, $user->uid);
00071 
00072       // add job_id to bulk_loader node
00073       $success = db_query("UPDATE {tripal_bulk_loader} SET job_id=%d WHERE nid=%d", $job_id, $form_state['values']['nid']);
00074 
00075       // change status
00076       db_query("UPDATE {tripal_bulk_loader} SET job_status='%s' WHERE nid=%d", 'Submitted to Queue', $form_state['values']['nid']);
00077     }
00078     else {
00079       drupal_set_message(t("Can not open %file. Job not scheduled.", array('%file' => $form_state['values']['file'])));
00080     }
00081   }
00082   elseif (preg_match('/Re-Submit Job/', $form_state['values']['op'])) {
00083     tripal_jobs_rerun($form_state['values']['job_id']);
00084     db_query("UPDATE {tripal_bulk_loader} SET job_status='%s' WHERE nid=%d", 'Submitted to Queue', $form_state['values']['nid']);
00085   }
00086   elseif (preg_match('/Cancel Job/', $form_state['values']['op'])) {
00087     db_query("UPDATE {tripal_bulk_loader} SET job_status='%s' WHERE nid=%d", 'Job Cancelled', $form_state['values']['nid']);
00088     tripal_jobs_cancel($form_state['values']['job_id']);
00089   }
00090   elseif (preg_match('/Revert/', $form_state['values']['op'])) {
00091 
00092     // Remove the records from the database that were already inserted
00093     $resource = db_query('SELECT * FROM {tripal_bulk_loader_inserted} WHERE nid=%d ORDER BY tripal_bulk_loader_inserted_id DESC', $form_state['values']['nid']);
00094     while ($r = db_fetch_object($resource)) {
00095       $ids = preg_split('/,/', $r->ids_inserted);
00096       db_query('DELETE FROM %s WHERE %s IN (%s)', $r->table_inserted_into, $r->table_primary_key, $r->ids_inserted);
00097       $result = db_fetch_object(db_query('SELECT true as present FROM %s WHERE %s IN (%s)', $r->table_inserted_into, $r->table_primary_key, $r->ids_inserted));
00098       if (!$result->present) {
00099         drupal_set_message(t('Successfully Removed data Inserted into the %tableto table.', array('%tableto' => $r->table_inserted_into)));
00100         db_query('DELETE FROM {tripal_bulk_loader_inserted} WHERE tripal_bulk_loader_inserted_id=%d', $r->tripal_bulk_loader_inserted_id);
00101       }
00102       else {
00103         drupal_set_message(t('Unable to remove data Inserted into the %tableto table!', array('%tableto' => $r->table_inserted_into)), 'error');
00104       }
00105     }
00106 
00107     // reset status
00108     db_query("UPDATE {tripal_bulk_loader} SET job_status='%s' WHERE nid=%d", 'Reverted -Data Deleted', $form_state['values']['nid']);
00109   }
00110 
00111 }
00112 
00125 function tripal_bulk_loader_load_data($nid, $job_id) {
00126 
00127   // ensure no timeout
00128   set_time_limit(0);
00129 
00130   // set the status of the job (in the node not the tripal jobs)
00131   db_query("UPDATE {tripal_bulk_loader} SET job_status='%s' WHERE nid=%d", 'Loading...', $nid);
00132 
00133 
00134   $node = node_load($nid);
00135   print "Template: " . $node->template->name . " (" . $node->template_id . ")\n";
00136 
00137   $total_lines = trim(`wc --lines < $node->file`);
00138   print "File: " . $node->file . " (" . $total_lines . " lines)\n";
00139 
00140   print "\nClearing all prepared statements from previous runs of this loader...\n";
00141   tripal_core_chado_clear_prepared('_'.$node->nid.'_');
00142 
00143   // Prep Work ==================================================================================
00144   print "\nPreparing to load...\n";
00145   $loaded_without_errors = TRUE;
00146 
00147   // Generate default values array
00148   $default_data = array();
00149   $field2column = array();
00150   $record2priority = array();
00151   $tables = array();
00152   $template_array = $node->template->template_array;
00153 
00154   // first build the record2priority array
00155   foreach ($template_array as $priority => $record_array) {
00156     $record2priority[$record_array['record_id']] = $priority;
00157   }
00158 
00159   //
00160   foreach ($template_array as $priority => $record_array) {
00161     if (!is_array($record_array)) {
00162       continue;
00163     }
00164 
00165     // Add tables being inserted into to a list to be treated differently
00166     // this is used to acquire locks on these tables
00167     if (preg_match('/insert/', $record_array['mode'])) {
00168       $tables[$record_array['table']] = $record_array['table'];
00169     }
00170 
00171     // iterate through each of the fiels for the current record and
00172     // set the default_data array
00173     foreach ($record_array['fields'] as $field_index => $field_array) {
00174 
00175       $default_data[$priority]['table'] = $record_array['table'];
00176       $default_data[$priority]['mode'] = ($record_array['mode']) ? $record_array['mode'] : 'insert';
00177       $default_data[$priority]['select_if_duplicate'] = ($record_array['select_if_duplicate']) ? $record_array['select_if_duplicate'] : 0;
00178       $default_data[$priority]['update_if_duplicate'] = ($record_array['update_if_duplicate']) ? $record_array['update_if_duplicate'] : 0;
00179       $default_data[$priority]['disabled'] = ($record_array['disable']) ? $record_array['disable'] : 0;
00180       $default_data[$priority]['optional'] = ($record_array['optional']) ? $record_array['optional'] : 0;
00181       $default_data[$priority]['select_optional'] = ($record_array['select_optional']) ? $record_array['select_optional'] : 0;
00182       $default_data[$priority]['record_id'] = $record_array['record_id'];
00183       $default_data[$priority]['required'][$field_array['field']] = $field_array['required'];
00184 
00185       $one = $default_data[$priority];
00186       if (isset($field_array['regex'])) {
00187         $default_data[$priority]['regex_transform'][$field_array['field']] = $field_array['regex'];
00188       }
00189 
00190       $two = $default_data[$priority];
00191 
00192       if (preg_match('/table field/', $field_array['type'])) {
00193         $default_data[$priority]['values_array'][$field_array['field']] = '';
00194         $default_data[$priority]['need_further_processing'] = TRUE;
00195         $field2column[$priority][$field_array['field']] = $field_array['spreadsheet column'];
00196 
00197       }
00198       elseif (preg_match('/constant/', $field_array['type'])) {
00199         $default_data[$priority]['values_array'][$field_array['field']] = $field_array['constant value'];
00200 
00201       }
00202       elseif (preg_match('/foreign key/', $field_array['type'])) {
00203         $default_data[$priority]['values_array'][$field_array['field']] = array();
00204         $default_data[$priority]['need_further_processing'] = TRUE;
00205         $default_data[$priority]['values_array'][$field_array['field']]['foreign record']['record'] = $field_array['foreign key'];
00206 
00207         // Add in the FK / Referral table
00208         $fk_priority = $record2priority[$field_array['foreign key']];
00209         $fk_table = $template_array[$fk_priority]['table'];
00210         $default_data[$priority]['values_array'][$field_array['field']]['foreign record']['table'] = $fk_table;
00211 
00212         // Add in the FK / Referral field
00213         // for backwards compatibility we need to get the FK relationship to find
00214         // out what field we're joining on.  For templates created using a
00215         // previous version it was assumed that the FK field was always the field to join
00216         if (!array_key_exists('foreign field', $field_array)) {
00217           $tbl_description = tripal_core_get_chado_table_schema($record_array['table']);
00218           foreach ($tbl_description['foreign keys'] as $key_table => $key_array) {
00219             if ($key_table == $fk_table) {
00220               foreach ($key_array['columns'] as $left_field => $right_field) {
00221                 if ($left_field == $field_array['field']) {
00222                   $field_array['foreign field'] = $right_field;
00223                 }
00224               }
00225             }
00226           }
00227         }
00228         $default_data[$priority]['values_array'][$field_array['field']]['foreign record']['field'] = $field_array['foreign field'];
00229       }
00230       else {
00231         print 'WARNING: Unsupported type: ' . $field_array['type'] . ' for ' . $table . '.' . $field_array['field'] . "!\n";
00232       }
00233       $three = $default_data[$priority];
00234 
00235     } // end of foreach field
00236   } //end of foreach record
00237 
00239   // For each set of constants
00241   print "Loading...\n";
00242   $original_default_data = $default_data;
00243   $group_index = 0;
00244   $total_num_groups = sizeof($node->constants);
00245   foreach ($node->constants as $group_id => $set) {
00246     // revert default data array for next set of constants
00247     $default_data = $original_default_data;
00248     $group_index++;
00249 
00250     // Add constants
00251     if (!empty($set)) {
00252       print "Constants:\n";
00253       foreach ($set as $priority => $record) {
00254         foreach ($record as $field_id => $field) {
00255 
00256           print "\t- " . $field['chado_table'] . '.' . $field['chado_field'] . ' = ' . $field['value'] . "\n";
00257 
00258           if ($default_data[$priority]['table'] == $field['chado_table']) {
00259             if (isset($default_data[$priority]['values_array'][$field['chado_field']])) {
00260               if (isset($field2column[$priority][$field['chado_field']])) {
00261                 $field2column[$priority][$field['chado_field']] = $field['value'];
00262               }
00263               else {
00264                 $default_data[$priority]['values_array'][$field['chado_field']] = $field['value'];
00265               }
00266             }
00267             else {
00268               print "ERROR: Template has changed after constants were assigned!\n";
00269               watchdog('T_bulk_loader', 'Template has changed after constants were assigned', array(), WATCHDOG_NOTICE);
00270               exit(1);
00271             }
00272           }
00273           else {
00274             print "ERROR: Template has changed after constants were assigned!\n";
00275             watchdog('T_bulk_loader', 'Template has changed after constants were assigned', array(), WATCHDOG_NOTICE);
00276             exit(1);
00277           }
00278         }
00279       }
00280     }
00281 
00282     // Open File
00283     print "\tPreparing to load the current constant set...\n";
00284     print "\t\tOpen File...\n";
00285     $file = new SplFileObject($node->file, 'r');
00286     if (!$file) {
00287       watchdog('T_bulk_loader', 'Could not open file %file',
00288         array($node->file), WATCHDOG_ERROR);
00289       return;
00290     }
00291 
00292     // Set defaults
00293     $header = '';
00294     if (preg_match('/(t|true|1)/', $node->file_has_header)) {
00295       $file->next();
00296       $header = $file->current();
00297     }
00298     $num_records = 0;
00299     $num_lines = 0;
00300     $num_errors = 0;
00301     $interval = intval($total_lines * 0.0001);
00302     if ($interval == 0) {
00303       $interval = 1;
00304     }
00305 
00306     // Start Transaction
00307     $savepoint = '';
00308     switch (variable_get('tripal_bulk_loader_transactions', 'row')) {
00309       case "none":
00310         break;
00311       case "all":
00312         print "\t\tStart Transaction...\n";
00313         tripal_db_start_transaction();
00314         $transactions = TRUE;
00315         $savepoint = "";
00316         break;
00317       case "row":
00318         print "\t\tStart Transaction...\n";
00319         tripal_db_start_transaction();
00320         $transactions = TRUE;
00321         $savepoint = "last_row_complete";
00322         break;
00323     }
00324 
00325     // Disable triggers
00326     $triggers_disabled = FALSE;
00327     if ($transactions AND variable_get('tripal_bulk_loader_disable_triggers', TRUE)) {
00328       print "\t\tDefer Constraints...\n";
00329       $triggers_disabled = TRUE;
00330       chado_query("SET CONSTRAINTS ALL DEFERRED");
00331     }
00332 
00333     // Acquire Locks
00334     if ($transactions) {
00335       print "\t\tAcquiring Table Locks...\n";
00336       $lockmode = variable_get('tripal_bulk_loader_lock', 'ROW EXCLUSIVE');
00337       foreach ($tables as $table) {
00338         print "\t\t\t$lockmode for $table\n";
00339         chado_query("LOCK TABLE %s IN %s MODE", $table, $lockmode);
00340       }
00341     }
00342 
00343     print "\tLoading the current constant set...\n";
00344     tripal_bulk_loader_progress_bar(0, $total_lines);
00345     while (!$file->eof()) {
00346       $file->next();
00347       $raw_line = $file->current();
00348       $raw_line = trim($raw_line);
00349       if (empty($raw_line)) {
00350         continue;
00351       } // skips blank lines
00352       $line = explode("\t", $raw_line);
00353       $num_lines++;
00354 
00355       // update the job status every 1% of lines processed for the current group
00356       if ($node->job_id and $num_lines % $interval == 0) {
00357 
00358         // percentage of lines processed for the current group
00359         $group_progress = round(($num_lines / $total_lines) * 100);
00360         tripal_bulk_loader_progress_bar($num_lines, $total_lines);
00361 
00362         // percentage of lines processed for all groups
00363         // <previous group index> * 100 + <current group progress>
00364         // --------------------------------------------------------
00365         //               <total number of groups>
00366         // For example, if you were in the third group of 3 constant sets
00367         // and had a group percentage of 50% then the job progress would be
00368         // (2*100 + 50%) / 3 = 250%/3 = 83%
00369         $job_progress = round(((($group_index - 1) * 100) + $group_progress) / $total_num_groups);
00370         tripal_job_set_progress($node->job_id, $job_progress);
00371       }
00372 
00373       $data = $default_data;
00374 
00375       // iterate through each record and process the line
00376       $data_keys = array_keys($data);
00377       foreach ($data_keys as $priority) {
00378         $options = array(
00379           'field2column' => $field2column,
00380           'record2priority' => $record2priority,
00381           'line' => $line,
00382           'line_num' => $num_lines,
00383           'group_index' => $group_index,
00384           'node' => $node,
00385           'nid' => $node->nid,
00386         );
00387 
00388 
00389         // execute all records that are not disabled
00390         $no_errors = FALSE;
00391         if (array_key_exists($priority, $data) and
00392             array_key_exists('disabled', $data[$priority]) and
00393             $data[$priority]['disabled'] == 0) {
00394           $no_errors = process_data_array_for_line($priority, $data, $default_data, $options);
00395         }
00396         else {
00397           // set status to true for skipped records
00398           $no_errors = TRUE;
00399         }
00400 
00401         tripal_bulk_loader_progress_file_track_job($job_id, $no_errors);
00402         $failed = FALSE;
00403         if ( !$no_errors ) {
00404           // Encountered an error
00405           if ($transactions) {
00406             tripal_db_rollback_transaction($savepoint);
00407           }
00408           $failed = TRUE;
00409           break;
00410         }
00411       } // end of foreach table in default data array
00412 
00413       tripal_bulk_loader_progress_file_track_job($job_id, FALSE, TRUE);
00414 
00415       if ($failed) {
00416         break;
00417       }
00418       else {
00419         // Row inserted successfully
00420         // Set savepoint if supplied
00421         if ($savepoint) {
00422           if ($num_lines == 1) {
00423             tripal_db_set_savepoint_transaction($savepoint);
00424           }
00425           else {
00426             // Tell it to remove the previous savepoint of the same name
00427             tripal_db_set_savepoint_transaction($savepoint, TRUE);
00428           }
00429         }
00430       }
00431     } //end of foreach line of file
00432 
00433     // END Transaction
00434     if ($transactions) {
00435       // end the transaction
00436       tripal_db_commit_transaction();
00437     }
00438 
00439     if ($failed) {
00440       $loaded_without_errors = FALSE;
00441       break;
00442     }
00443 
00444     tripal_bulk_loader_progress_bar($total_lines, $total_lines);
00445     tripal_bulk_loader_progress_file_track_job($job_id, FALSE, FALSE, TRUE);
00446   } //end of foreach constant set
00447 
00448   // set the status of the job (in the node not the tripal jobs)
00449   if ($loaded_without_errors) {
00450     $status = 'Loading Completed Successfully';
00451   }
00452   else {
00453     $status = 'Errors Encountered';
00454   }
00455   db_query("UPDATE {tripal_bulk_loader} SET job_status='%s' WHERE nid=%d", $status, $nid);
00456 
00457 }
00458 
00472 function process_data_array_for_line($priority, &$data, &$default_data, $addt) {
00473 //$time_start = microtime(true);
00474 
00475   $table_data = $data[$priority];
00476   $addt = (object) $addt;
00477   $no_errors = TRUE;
00478 
00479   $table = $table_data['table'];
00480   $values = $table_data['values_array'];
00481 
00482   // populate the values array with real value either from the input data file line
00483   // or from the foreign key / referral record
00484   if (array_key_exists('need_further_processing', $table_data) and $table_data['need_further_processing']) {
00485     if (array_key_exists($priority, $addt->field2column)) {
00486       $values = tripal_bulk_loader_add_spreadsheetdata_to_values($values, $addt->line, $addt->field2column[$priority]);
00487     }
00488     $values = tripal_bulk_loader_add_foreignkey_to_values($table_data, $values, $data, $addt->record2priority, $addt->nid, $priority, $default_data);
00489   }
00490 
00491   $values = tripal_bulk_loader_regex_tranform_values($values, $table_data, $addt->line);
00492   if (!$values) {
00493     //watchdog('T_bulk_loader', 'Line ' . $addt->line_num . ' Regex:<pre>' . print_r($values, TRUE) . print_r($table_data, TRUE) . '</pre>' . '</pre>', array(), WATCHDOG_NOTICE);
00494   }
00495 
00496   // get the table description
00497   $table_desc = tripal_core_get_chado_table_schema($table);
00498 
00499   // Check that template required fields are present. if a required field is
00500   // missing and this
00501   // is an optional record then just return. otherwise raise an error
00502   $skip_optional = 0;
00503   foreach ($table_data['required'] as $field => $required) {
00504     if ($required) {
00505       // check if the field has no value (or array is empty)
00506       if (!isset($values[$field]) or
00507           (is_array($values[$field]) and count($values[$field]) == 0)) {
00508         // check if the record is optional.  For backwards compatiblity we need to
00509         // check if the 'mode' is set to 'optional'
00510         if ($table_data['optional'] or preg_match('/optional/', $table_data['mode']) or
00511             $table_data['select_optional'])  {
00512           $skip_optional = 1;
00513           // set the values array to be empty since we all required fields are
00514           // optional and we can't do a select/insert so we don't want to keep
00515           // the values if this record is used in a later FK relationship.
00516           $values = array();
00517         }
00518         else {
00519           $msg = "\nLine " . $addt->line_num . ' "' . $table_data['record_id'] .
00520             '" (' . $table_data['mode'] . ') Missing template required value: ' . $table . '.' . $field;
00521           watchdog('T_bulk_loader', $msg, array(), WATCHDOG_WARNING);
00522           $data[$priority]['error'] = TRUE;
00523           $no_errors = FALSE;
00524         }
00525       }
00526     }
00527   }
00528 
00529   // for an insert, check that all database required fields are present in the values array
00530   // we check for 'optional' in the mode for backwards compatibility. The 'optional'
00531   // mode used to be a type of insert
00532   if (!$skip_optional and (preg_match('/insert/', $table_data['mode']) or
00533        preg_match('/optional/', $table_data['mode']))) {
00534     // Check all database table required fields are set
00535     $fields = $table_desc['fields'];
00536     foreach ($fields as $field => $def) {
00537       // a field is considered missing if it cannot be null and there is no default
00538       // value for it or it is not of type 'serial'
00539       if (array_key_exists('not null', $def) and $def['not null'] == 1 and   // field must have a value
00540           !array_key_exists($field, $values) and                             // there is not a value for it
00541           !array_key_exists('default', $def) and                             // there is no default for it
00542           strcmp($def['type'], 'serial') != 0) {                             // it is not a 'serial' type column
00543         $msg = "\nLine " . $addt->line_num . ' ' . $table_data['record_id'] .
00544                ' (' . $table_data['mode'] . ') Missing Database Required Value: ' . $table . '.' . $field;
00545         watchdog('T_bulk_loader', $msg, array(), WATCHDOG_ERROR);
00546         $data[$priority]['error'] = TRUE;
00547       }
00548     }
00549   }
00550 
00551   // add updated values array into the data array
00552   $data[$priority]['values_array'] = $values;
00553 
00554   // if there was an error already -> don't insert
00555   if (array_key_exists('error', $data[$priority]) and $data[$priority]['error']) {
00556     watchdog('T_bulk_loader','Skipping processing of %table due to previous errors',array('%table'=>$table),WATCHDOG_NOTICE);
00557     return $no_errors;
00558   }
00559 
00560   // skip optional fields
00561   if ($skip_optional) {
00562     watchdog('T_bulk_loader','Skipping an optional record (%record)',array('%record'=>$table_data['record_id']),WATCHDOG_NOTICE);
00563     return $no_errors;
00564   }
00565 
00566   // check if it is already inserted
00567   if (array_key_exists('inserted', $table_data) and $table_data['inserted']) {
00568     watchdog('T_bulk_loader','Skipping %record since it is already inserted',array('%record'=>$table_data['record_id']),WATCHDOG_NOTICE);
00569     return $no_errors;
00570   }
00571 
00572   // check if it is already selected, if so, just get the value stored in
00573   // the default_data array
00574   if (array_key_exists('selected', $table_data) and $table_data['selected']) {
00575     $data[$priority]['values_array'] = $default_data[$priority]['values_array'];
00576     watchdog('T_bulk_loader','%record was already selected thus we are just returning the values previously selected.',array('%record'=>$table_data['record_id']),WATCHDOG_NOTICE);
00577     return $no_errors;
00578   }
00579 
00580   // make sure we have some value in the select_if_duplicate and update_if_duplicate options
00581   if (!array_key_exists('select_if_duplicate', $table_data)) {
00582     $table_data['select_if_duplicate'] = 0;
00583   }
00584   if (!array_key_exists('update_if_duplicate', $table_data)) {
00585     $table_data['update_if_duplicate'] = 0;
00586   }
00587 
00588   // if "select if duplicate" is enabled then check to ensure unique constraint is not violoated.
00589   // If it is violoated then simply return, the record already exists in the database.
00590   // We check for "insert_unique" for backwards compatibilty but that mode no longer exists
00591   $data[$priority]['is_duplicate'] = 0;
00592   if (preg_match('/insert_unique/', $table_data['mode']) or
00593      $table_data['select_if_duplicate'] == 1 or $table_data['update_if_duplicate'] == 1) {
00594     $options = array('is_duplicate' => TRUE);
00595     $duplicate = tripal_core_chado_select($table, array_keys($table_desc['fields']), $values, $options);
00596 
00597     // if this is a duplicate then substitute the values in the table_data array so
00598     // that for future records that may depend on this one, they can get the values needed
00599     if ($duplicate and is_array($duplicate) and count($duplicate) == 1) {
00600       $dup_record = $duplicate[0];
00601       // save the duplicate record for later.  If this is an update_if_duplicate
00602       // then we'll need this record as the match
00603       $data[$priority]['is_duplicate'] = (array) $dup_record;
00604 
00605       // if all we have is one field then we will just use the value returned
00606       // rather than create an array of values. This way it will prevent
00607       // the tripal_core_chado_(select|insert|update) from recursing on
00608       // foreign keys and make the loader go faster.
00609       if (count((array) $dup_record) == 1) {
00610         foreach ($dup_record as $key => $value) {
00611           $data[$priority]['values_array'] = $value;
00612         }
00613       }
00614       // if we have multiple fields returned then we need to set the values
00615       // the new array.
00616       else {
00617         // convert object to array
00618         $new_values = array();
00619         foreach ($dup_record as $key => $value) {
00620           $new_values[$key] = $value;
00621         }
00622         $data[$priority]['values_array'] = $new_values;
00623       }
00624       // return if this is a select_if_duplicate
00625       if ($table_data['select_if_duplicate'] == 1) {
00626         watchdog('T_bulk_loader','Simply returning values for %record since it was already inserted',array('%record'=>$table_data['record_id']),WATCHDOG_NOTICE);
00627         return $no_errors;
00628       }
00629     }
00630   }
00631   else {
00632     # TODO: what to do if there are more than one value returned when
00633     # checking for a duplicate?
00634   }
00635 
00636   if (!preg_match('/select/', $table_data['mode'])) {
00637     // Use prepared statement?
00638     if (variable_get('tripal_bulk_loader_prepare', TRUE)) {
00639       $options = array('statement_name' =>  'record_' . $addt->nid . '_' . $priority);
00640       if (($addt->line_num > 1 && $addt->group_index == 1) OR $addt->group_index > 1) {
00641         //$options['is_prepared'] = TRUE;
00642       }
00643     }
00644     else {
00645       $options = array();
00646     }
00647     // Skip tripal_core_chado_insert() built-in validation?
00648     if (variable_get('tripal_bulk_loader_skip_validation', FALSE)) {
00649       $options['skip_validation'] = TRUE;
00650     }
00651 
00652     if ($table_data['update_if_duplicate'] == 1) {
00653       if (array_key_exists('statement_name', $options)) {
00654         $options['statement_name'] = 'upd_' . $options['statement_name'];
00655       }
00656       // This should have been set on the first round of inserts for this record
00657       $match = $data[$priority]['is_duplicate'];
00658       // However, sometimes there is a pre-existing record before the loader starts
00659                         // Thus check that this value is set and if not, then generate a match array
00660                         // based on the unique keys for this record.
00661                         if (empty($match)) {
00662                                 $match = array();
00663                                 // First check to see if we have fields for the primary key
00664                                 foreach ($table_desc['primary key'] as $k_field) {
00665                                         if (!empty($values[$k_field])) {
00666                                                 $match[$k_field] = $values[$k_field];
00667                                         }
00668                                 }
00669                                 // Otherwise check the fields that are part of the unique key
00670                                 if (empty($match)) {
00671                                         foreach ($table_desc['unique keys'] as $u_keys) {
00672                                                 foreach ($u_keys as $u_field) {
00673                                                         if (!empty($values[$u_field])) {
00674                                                                 $match[$u_field] = $values[$u_field];
00675                                                         }
00676                                                 }
00677                                         }
00678                                 }
00679                         }
00680                         if (!empty($match)) {
00681                                 // Now we need to check if it already exists via a select
00682                                 $results = tripal_core_chado_select($table, array_keys($table_desc['fields']), $match);
00683                                 // If not then insert
00684                                 if (empty($results)) {
00685                                         $options['statement_name'] = 'ins_'.$options['statement_name'];
00686                                         $record = tripal_core_chado_insert($table, $values, $options);
00687                                 }
00688                                 else {
00689                                   $options['return_record'] = TRUE;
00690                     $record = tripal_core_chado_update($table, $match, $values, $options);
00691                   }
00692           }
00693           else {
00694                 $msg = "\nLine " . $addt->line_num . ' ' . $table_data['record_id'] . ' (' .
00695         $table_data['mode'] . ') Unable to update record since none of the unique key or primary key fields were available ' .
00696         ' where values:' . print_r($values, TRUE);
00697 
00698         watchdog('T_bulk_loader', $msg, array(), WATCHDOG_ERROR);
00699         $data[$priority]['error'] = TRUE;
00700         $no_errors = FALSE;
00701           }
00702     }
00703     else {
00704       $record = tripal_core_chado_insert($table, $values, $options);
00705     }
00706 
00707     // if the insert was not successful
00708     if (!$record) {
00709       $msg = "\nLine " . $addt->line_num . ' ' . $table_data['record_id'] . ' (' .
00710         $table_data['mode'] . ') Unable to insert record into ' . $table .
00711         ' where values:' . print_r($values, TRUE);
00712 
00713       watchdog('T_bulk_loader', $msg, array(), WATCHDOG_ERROR);
00714       $data[$priority]['error'] = TRUE;
00715       $no_errors = FALSE;
00716     }
00717     // if the insert was succesful
00718     else {
00719 
00720       // if mode=insert_once then ensure we only insert it once
00721       if (preg_match('/insert_once/', $table_data['mode'])) {
00722         $default_data[$priority]['inserted'] = TRUE;
00723       }
00724 
00725       // add to tripal_bulk_loader_inserted
00726       if ($addt->node->keep_track_inserted) {
00727         $insert_record = db_fetch_object(db_query(
00728           "SELECT * FROM {tripal_bulk_loader_inserted} WHERE table_inserted_into='%s' AND nid=%d",
00729           $table,
00730           $addt->nid
00731         ));
00732         if ($insert_record) {
00733           $insert_record->ids_inserted .= ',' . $record[$table_desc['primary key'][0] ];
00734           drupal_write_record('tripal_bulk_loader_inserted', $insert_record, 'tripal_bulk_loader_inserted_id');
00735           //print 'Update: '.print_r($insert_record,TRUE)."\n";
00736           //return $no_errors;
00737         }
00738         else {
00739           $insert_record = array(
00740             'nid' => $addt->nid,
00741             'table_inserted_into' => $table,
00742             'table_primary_key' => $table_desc['primary key'][0],
00743             'ids_inserted' => $record[ $table_desc['primary key'][0] ],
00744           );
00745           //print 'New: '.print_r($insert_record,TRUE)."\n";
00746           $success = drupal_write_record('tripal_bulk_loader_inserted', $insert_record);
00747           //return $no_errors;
00748         }//end of if insert record
00749       }// end of if keeping track of records inserted
00750 
00751       // substitute the values array for the primary key if it exists
00752       // and is a single field
00753       if (array_key_exists('primary key', $table_desc)) {
00754         if (count($table_desc['primary key']) == 1) {
00755           $pkey_field = $table_desc['primary key'][0];
00756           $data[$priority]['values_array'] = $record[$pkey_field];
00757         }
00758       }
00759       else {
00760         //add changes back to values array
00761         $data[$priority]['values_array'] = $record;
00762         $values = $record;
00763       }
00764     } //end of if insert was successful
00765   }
00766   // perform a select
00767   else {
00768     // get the matches for this select
00769     $matches = array();
00770     if (is_array($values) and count($values) > 0) {
00771       $matches = tripal_core_chado_select($table, array_keys($table_desc['fields']), $values);
00772     }
00773     // if the record doesn't exist and it's not optional then generate an error
00774     if (count($matches) == 0) {
00775       // No record on select
00776       if ($table_data['select_optional'] != 1) {
00777         $msg = "\nLine " . $addt->line_num . ' ' . $table_data['record_id'] . ' (' . $table_data['mode'] . ') No Matching record in ' . $table . ' where values:' . print_r($values, TRUE);
00778         watchdog('T_bulk_loader', $msg, array(), WATCHDOG_ERROR);
00779         $data[$priority]['error'] = TRUE;
00780         $no_errors = FALSE;
00781       }
00782       // there is no match and select optional is turned on, so we want to set
00783       // the values to empty for any records with an FK relationship on this one
00784       else {
00785         $data[$priority]['values_array'] = NULL;
00786       }
00787     }
00788     // if we have more than one record matching and this select isn't optional then fail
00789     if (count($matches) > 1) {
00790       if ($table_data['select_optional'] != 1) {
00791         $msg = "\nLine " . $addt->line_num . ' ' . $table_data['record_id'] . ' (' . $table_data['mode'] . ') Too many matching records in ' . $table . ' where values:' . print_r($values, TRUE);
00792         watchdog('T_bulk_loader', $msg, array(), WATCHDOG_WARNING);
00793         $data[$priority]['error'] = TRUE;
00794         $no_errors = FALSE;
00795       }
00796       // there are too many matches and this is an optional select so set
00797       // the values to empty for any records with an FK relationship on this one
00798       else {
00799         $data[$priority]['values_array'] = NULL;
00800       }
00801     }
00802     // if mode=select_once then ensure we only select it once
00803     if (preg_match('/select_once/', $table_data['mode'])) {
00804       $default_data[$priority]['selected'] = TRUE;
00805 
00806       // save the pkey
00807       if (array_key_exists('primary key', $table_desc)) {
00808         $new_values = array();
00809         foreach ($matches[0] as $key => $value) {
00810           $new_values[$key] = $value;
00811         }
00812         $default_data[$priority]['values_default'] = $new_values;
00813       }
00814     }
00815   }
00816 
00817   return $no_errors;
00818 }
00819 
00832 function tripal_bulk_loader_add_spreadsheetdata_to_values($values, $line, $field2column) {
00833   foreach ($values as $field => $value) {
00834     if (is_array($value)) {
00835       continue;
00836     }
00837 
00838     $column = $field2column[$field] - 1;
00839     if ($column < 0) {
00840       continue;
00841     }
00842 
00843     if (preg_match('/\S+/', $line[$column])) {
00844       $values[$field] = $line[$column];
00845     }
00846     else {
00847       unset($values[$field]);
00848     }
00849   }
00850 
00851   return $values;
00852 }
00853 
00862 function tripal_bulk_loader_add_foreignkey_to_values($table_array, $values, $data, $record2priority, $nid,
00863   $priority, $default_data) {
00864 
00865   // iterate through each field in the $values arrray and
00866   // substitute any values for FK / referring fields
00867   foreach ($values as $field => $value) {
00868     // if the field value is an array then it is an FK
00869     if (is_array($value)) {
00870 
00871       // get the name and priority of the foreign record
00872       $foreign_record   = $value['foreign record']['record'];
00873       $foreign_priority = $record2priority[$foreign_record];
00874       $foreign_table    = $value['foreign record']['table'];
00875       $foreign_field    = $value['foreign record']['field'];
00876 
00877       // get the values of the foreign record and substitute those for the values
00878       $foreign_values   = $data[$foreign_priority]['values_array'];
00879 
00880       // check to see if we have any default values in the $default_data array
00881       // these were populated from select statements that only need to run once
00882       // so we can reuse the values from those previous selects.
00883       if (array_key_exists($foreign_priority, $default_data) and
00884           array_key_exists('values_default', $default_data[$foreign_priority]) and
00885           array_key_exists($foreign_field, $default_data[$foreign_priority]['values_default'])) {
00886          $values[$field] = $default_data[$foreign_priority]['values_default'][$foreign_field];
00887          continue;
00888       }
00889 
00890       // if the field in the Referral records is in a FK relationship with
00891       // this field then we can simply keep the value we have
00892       $tbl_description = tripal_core_get_chado_table_schema($table_array['table']);
00893       if ($tbl_description and
00894           array_key_exists('foreign keys', $tbl_description) and
00895           array_key_exists($foreign_table, $tbl_description['foreign keys']) and
00896           array_key_exists($field, $tbl_description['foreign keys'][$foreign_table]['columns']) and
00897           $foreign_field == $tbl_description['foreign keys'][$foreign_table]['columns'][$field]) {
00898          $values[$field] = $foreign_values;
00899       }
00900       // if the field in the Referral records is not in an FK relationship
00901       // with this field then we we have to get the requested value, we must
00902       // return only a single value
00903       else {
00904         // if the current value of the referral records is a non-array then this
00905         // is the primary key, we can use it to select the value we need.
00906         $fk_description = tripal_core_get_chado_table_schema($foreign_table);
00907         if (!is_array($foreign_values)) {
00908           // if we have a value then use it to get the field we need
00909           if ($foreign_values) {
00910             $fvalues = array($fk_description['primary key'][0] => $foreign_values);
00911             $columns = array($foreign_field);
00912             $options = array('statement_name' => 'pk_' . $foreign_table);
00913             $record  = tripal_core_chado_select($foreign_table, $columns, $fvalues, $options);
00914             if ($record) {
00915               $values[$field] = $record[0]->$foreign_field;
00916             }
00917             else {
00918               unset($values[$field]);
00919             }
00920           }
00921           // if we don't have a value then there's nothing we can do so
00922           // set this value to nothing as well
00923           else {
00924             unset($values[$field]);
00925           }
00926         }
00927         // if the current value is an array and our field is not in it, then
00928         // we need to select a value for our field.
00929         else {
00930           $fvalues  = $foreign_values;
00931           $columns = array($foreign_field);
00932           $options = array('statement_name' => 'blk_' . $nid . $priority . $foreign_table);
00933           $record  = tripal_core_chado_select($foreign_table, $columns, $fvalues, $options);
00934           if ($record) {
00935             $values[$field] = $record[0]->$foreign_field;
00936           }
00937           else {
00938             unset($values[$field]);
00939           }
00940         } // end else from: if (!is_array($foreign_values) ...
00941       } // end else from: if ($tbl_description ...
00942     } // end if(is_array($value)) ...
00943   } // end foreach ($values ...
00944 
00945   // return the updated field values
00946   return $values;
00947 }
00948 
00957 function tripal_bulk_loader_regex_tranform_values($values, $table_data, $line) {
00958 
00959   if (!array_key_exists('regex_transform', $table_data) or
00960       empty($table_data['regex_transform']) or
00961       !array_key_exists('regex_transform', $table_data) or
00962       !is_array($table_data['regex_transform'])) {
00963     return $values;
00964   }
00965 
00966   //watchdog('T_bulk_loader','Regex Transformation:<pre>'.print_r($table_data['regex_transform'], TRUE).'</pre>', array(), WATCHDOG_NOTICE);
00967 
00968   foreach ($table_data['regex_transform'] as $field => $regex_array) {
00969     if (!array_key_exists('replace', $regex_array) or
00970         !array_key_exists('pattern', $regex_array) or
00971         !is_array($regex_array['replace'])) {
00972       continue;
00973     }
00974 
00975     // Check for <#column:\d+#> notation
00976     // if present replace with that column in the current line
00977     foreach ($regex_array['replace'] as $key => $replace) {
00978       if (preg_match_all('/<#column:(\d+)#>/', $replace, $matches)) {
00979         foreach ($matches[1] as $k => $column_num) {
00980           $replace = preg_replace('/' . $matches[0][$k] .'/', $line[$column_num-1], $replace);
00981         }
00982         $regex_array['replace'][$key] = $replace;
00983       }
00984     }
00985 
00986     // do the full replacement
00987     $old_value = $values[$field];
00988     $new_value = preg_replace($regex_array['pattern'], $regex_array['replace'], $old_value);
00989     $values[$field] = $new_value;
00990 
00991     if ($values[$field] === '') {
00992       unset($values[$field]);
00993     }
00994     //print 'Now:'.$values[$field]."\n";
00995   }
00996 
00997   return $values;
00998 }
00999 
01004 function tripal_bulk_loader_flatten_array($values) {
01005   $flattened_values = array();
01006 
01007   foreach ($values as $k => $v) {
01008     if (is_array($v)) {
01009       $vstr = array();
01010       foreach ($v as $vk => $vv) {
01011         if (drupal_strlen($vv) > 20) {
01012           $vstr[] = $vk . '=>' . drupal_substr($vv, 0, 20) . '...';
01013         }
01014         else {
01015           $vstr[] = $vk . '=>' . $vv;
01016         }
01017       }
01018       $v = '{' . implode(',', $vstr) . '}';
01019     }
01020     elseif (drupal_strlen($v) > 20) {
01021       $v = drupal_substr($v, 0, 20) . '...';
01022     }
01023     $flattened_values[] = $k . '=>' . $v;
01024   }
01025 
01026   return implode(', ', $flattened_values);
01027 }
01028 
01032 function tripal_bulk_loader_progress_bar($current=0, $total=100, $size=50) {
01033   $new_bar = FALSE;
01034   $mem = memory_get_usage();
01035 
01036   // First iteration
01037   if ($current == 0) {
01038     $new_bar = TRUE;
01039     fputs(STDOUT, "Progress:\n");
01040   }
01041 
01042   // Percentage round off for a more clean, consistent look
01043   $percent = sprintf("%.02f", round(($current/$total) * 100, 2));
01044   // percent indicator must be four characters, if shorter, add some spaces
01045   for ($i = strlen($percent); $i <= 4; $i++) {
01046     $percent = ' ' . $percent;
01047   }
01048 
01049 
01050   $total_size = $size + $i + 3 + 2;
01051   $place = 0;
01052   // if it's not first go, remove the previous bar
01053   if (!$new_bar) {
01054     for ($place = $total_size; $place > 0; $place--) {
01055       // echo a backspace (hex:08) to remove the previous character
01056       //echo "\x08";
01057     }
01058   }
01059 
01060   // output the progess bar as it should be
01061   // Start with a border
01062   echo '[';
01063   for ($place = 0; $place <= $size; $place++) {
01064     // output "full" spaces if this portion is completed
01065     if ($place <= ($current / $total * $size)) {
01066       echo '|';
01067     }
01068     else {
01069       // Otherwise empty space
01070       echo '-';
01071     }
01072   }
01073   // End with a border
01074   echo ']';
01075 
01076   // end a bar with a percent indicator
01077   echo " $percent%. ($current of $total) Memory: $mem\r";
01078 
01079   // if it's the end, add a new line
01080   if ($current == $total) {
01081     echo "\n";
01082   }
01083 
01084 }
01085 
01106 function tripal_bulk_loader_progress_file_track_job($job_id, $record_added, $line_complete = FALSE, $close = FALSE) {
01107   // retrieve the file handle
01108   $file_handle = variable_get('tripal_bulk_loader_progress_file_handle', NULL);
01109 
01110   // open file for reading if not already
01111   if (!$file_handle) {
01112     $file_handle = fopen('/tmp/tripal_bulk_loader_progress-'. $job_id . '.out', 'w');
01113     variable_set('tripal_bulk_loader_progress_file_handle', $file_handle);
01114   }
01115 
01116   if ($record_added) {
01117     fwrite($file_handle, '.');
01118   }
01119 
01120   if ($line_complete) {
01121     fwrite($file_handle, "\n");
01122   }
01123 
01124   // close the file if finished
01125   if ($close) {
01126     fclose($file_handle);
01127     variable_set('tripal_bulk_loader_progress_file_handle', NULL);
01128   }
01129 }
 All Classes Files Functions Variables