- Added new model system/JobTriggers_model

- Loads jqm configuration in controller system/jq/JobsQueueManager
- Added new table system.tbl_jobtriggers to system/dbupdate_3.3.php
- Added new privacy method _addNewTriggeredJobToQueue to JobsQueueLib
- JobsQueueLib is called by addNewJobsToQueue and updateJobsQueue
This commit is contained in:
Paolo
2020-03-12 18:51:26 +01:00
parent 8363f0d26c
commit 4d1dbdfae9
4 changed files with 144 additions and 11 deletions
@@ -26,6 +26,9 @@ class JobsQueueManager extends Auth_Controller
)
);
// Loading config file jqm
$this->config->load('jqm');
// Loads JobsQueueLib
$this->load->library('JobsQueueLib');
// Loads permission lib
+60 -4
View File
@@ -38,6 +38,7 @@ class JobsQueueLib
$this->_ci->load->model('system/JobsQueue_model', 'JobsQueueModel');
$this->_ci->load->model('system/JobTypes_model', 'JobTypesModel');
$this->_ci->load->model('system/JobStatuses_model', 'JobStatusesModel');
$this->_ci->load->model('system/JobTriggers_model', 'JobTriggersModel');
}
//------------------------------------------------------------------------------------------------------------------
@@ -92,17 +93,21 @@ class JobsQueueLib
$job->{self::PROPERTY_TYPE} = $type; // What you asked is what you get!
// Try to insert the single job into database
$dbResult = $this->_ci->JobsQueueModel->insert($job);
$dbNewJobResult = $this->_ci->JobsQueueModel->insert($job);
// If an error occurred during while inserting in database
if (isError($dbResult))
if (isError($dbNewJobResult))
{
$job->{self::PROPERTY_ERROR} = getError($dbResult); // retrieve the cause and store it in job object
$job->{self::PROPERTY_ERROR} = getError($dbNewJobResult); // retrieve the cause and store it in job object
$errorOccurred = true; // set error occurred flag
}
else // otherwise
{
$job->{self::PROPERTY_JOBID} = getData($dbResult); // get the jobid and store it in job object
$job->{self::PROPERTY_JOBID} = getData($dbNewJobResult); // get the jobid and store it in job object
$dbNewTriggeredJobResult = $this->_addNewTriggeredJobToQueue($type, $job, array(self::STATUS_NEW));
// If an error occurred during while inserting in database
if (isError($dbNewTriggeredJobResult)) return $dbNewTriggeredJobResult;
}
}
else // otherwise
@@ -137,6 +142,8 @@ class JobsQueueLib
// Loops through all the provided jobs
foreach ($results as $job)
{
// TODO: find if present in database or not!!!
// If the structure of the job object is valid
if ($this->_checkUpdateJobStructure($job) && $this->_checkJobStatus($job, $statuses))
{
@@ -153,6 +160,16 @@ class JobsQueueLib
$job->{self::PROPERTY_ERROR} = getError($dbResult); // retrieve the cause and store it in job object
$errorOccurred = true; // set error occurred flag
}
else // otherwise
{
$dbNewTriggeredJobResult = $this->_addNewTriggeredJobToQueue(
$type,
$job,
array($job->status)
);
// If an error occurred during while inserting in database
if (isError($dbNewTriggeredJobResult)) return $dbNewTriggeredJobResult;
}
}
else // otherwise
{
@@ -277,4 +294,43 @@ class JobsQueueLib
unset($job->{self::PROPERTY_CREATIONTIME});
unset($job->{self::PROPERTY_TYPE});
}
/**
* Add e new triggered job to the jobs queue
* NOTE:
* - In this method there are less checks compared to addNewJobsToQueue method because
* the new jobs that will be added are generate in this method
* - Job ids in this case are not returned, therefore the caller is not going to be informed about these new jobs
*/
private function _addNewTriggeredJobToQueue($type, $job, $triggeredStatuses)
{
// Get all the job trigggers for the given type and for the given statuses
$dbTriggersResult = $this->_ci->JobTriggersModel->getJobtriggersByTypeStatuses($type, $triggeredStatuses);
// If an error occurred while getting job triggers from database then return it
if (isError($dbTriggersResult)) return $dbTriggersResult;
if (hasData($dbTriggersResult)) // If triggers were retrieved
{
// The output of the trigging job is the input of the trigged job
$triggeredJobInput = null;
if (isset($job->{self::PROPERTY_OUTPUT})) $triggeredJobInput = $job->{self::PROPERTY_OUTPUT};
// For each trigger
foreach (getData($dbTriggersResult) as $trigger)
{
$triggeredJob = array(
self::PROPERTY_TYPE => $trigger->following_type, // the new type is the one defined in tbl_jobtriggers
self::PROPERTY_STATUS => self::STATUS_NEW, // new job status is new
self::PROPERTY_INPUT => $triggeredJobInput // new job input
);
// Try to insert the single job into database
$dbNewJob = $this->_ci->JobsQueueModel->insert($triggeredJob);
// If an error occurred during while inserting in database
if (isError($dbNewJob)) return $dbNewJob;
}
}
return success(); // if here then it was a success!
}
}
@@ -0,0 +1,32 @@
<?php
class JobTriggers_model extends DB_Model
{
/**
* Constructor
*/
public function __construct()
{
parent::__construct();
$this->dbTable = 'system.tbl_jobtriggers';
$this->pk = array('type', 'status', 'followingType');
$this->hasSequence = false;
}
/**
*
*/
public function getJobtriggersByTypeStatuses($type, $triggeredStatuses)
{
$query = 'SELECT jt.type,
jt.status,
jt.following_type
FROM system.tbl_jobtriggers jt
WHERE jt.type = ?
AND jt.status IN ?
ORDER BY jt.type, jt.status';
return $this->execQuery($query, array($type, $triggeredStatuses));
}
}
+49 -7
View File
@@ -3846,7 +3846,7 @@ if ($result = $db->db_query("SELECT 1 FROM pg_class WHERE relname = 'unq_idx_abl
if (!$result = @$db->db_query('SELECT 1 FROM system.tbl_jobstatuses LIMIT 1'))
{
$qry = 'CREATE TABLE system.tbl_jobstatuses (
status character varying NOT NULL
status character varying(64) NOT NULL
);
COMMENT ON TABLE system.tbl_jobstatuses IS \'All possible job statuses\';
@@ -3854,10 +3854,10 @@ if (!$result = @$db->db_query('SELECT 1 FROM system.tbl_jobstatuses LIMIT 1'))
ALTER TABLE ONLY system.tbl_jobstatuses ADD CONSTRAINT pk_jobstatuses PRIMARY KEY (status);
INSERT INTO system.tbl_jobstatuses(status) VALUES('new');
INSERT INTO system.tbl_jobstatuses(status) VALUES('running');
INSERT INTO system.tbl_jobstatuses(status) VALUES('done');
INSERT INTO system.tbl_jobstatuses(status) VALUES('failed');
INSERT INTO system.tbl_jobstatuses(status) VALUES(\'new\');
INSERT INTO system.tbl_jobstatuses(status) VALUES(\'running\');
INSERT INTO system.tbl_jobstatuses(status) VALUES(\'done\');
INSERT INTO system.tbl_jobstatuses(status) VALUES(\'failed\');
';
if (!$db->db_query($qry))
@@ -3882,7 +3882,7 @@ if (!$result = @$db->db_query('SELECT 1 FROM system.tbl_jobstatuses LIMIT 1'))
if (!$result = @$db->db_query('SELECT 1 FROM system.tbl_jobtypes LIMIT 1'))
{
$qry = 'CREATE TABLE system.tbl_jobtypes (
type character varying(256) NOT NULL,
type character varying(128) NOT NULL,
description text NOT NULL
);
@@ -3916,7 +3916,7 @@ if (!$result = @$db->db_query('SELECT 1 FROM system.tbl_jobsqueue LIMIT 1'))
{
$qry = 'CREATE TABLE system.tbl_jobsqueue (
jobid integer NOT NULL,
type character varying(256) NOT NULL,
type character varying(128) NOT NULL,
creationtime timestamp without time zone DEFAULT now(),
status character varying(64) NOT NULL,
input jsonb,
@@ -3978,6 +3978,47 @@ if (!$result = @$db->db_query('SELECT 1 FROM system.tbl_jobsqueue LIMIT 1'))
echo '<br>Granted privileges to <strong>vilesci</strong> on system.tbl_jobsqueue';
}
// Creates table system.tbl_jobtriggers if it doesn't exist and grants privileges
if (!$result = @$db->db_query('SELECT 1 FROM system.tbl_jobtriggers LIMIT 1'))
{
$qry = 'CREATE TABLE system.tbl_jobtriggers (
type character varying(128) NOT NULL,
status character varying(64) NOT NULL,
following_type character varying(128) NOT NULL
);
COMMENT ON TABLE system.tbl_jobtriggers IS \'Table to manage the job triggers\';
COMMENT ON COLUMN system.tbl_jobtriggers.type IS \'Job type\';
COMMENT ON COLUMN system.tbl_jobtriggers.status IS \'Job status\';
COMMENT ON COLUMN system.tbl_jobtriggers.following_type IS \'New job type\';
ALTER TABLE ONLY system.tbl_jobtriggers ADD CONSTRAINT pk_jobtriggers PRIMARY KEY (type, status, following_type);
ALTER TABLE ONLY system.tbl_jobtriggers ADD CONSTRAINT fk_jobtriggers_status FOREIGN KEY (status) REFERENCES system.tbl_jobstatuses(status) ON UPDATE CASCADE ON DELETE RESTRICT;
ALTER TABLE ONLY system.tbl_jobtriggers ADD CONSTRAINT fk_jobtriggers_type FOREIGN KEY (type) REFERENCES system.tbl_jobtypes(type) ON UPDATE CASCADE ON DELETE RESTRICT;
ALTER TABLE ONLY system.tbl_jobtriggers ADD CONSTRAINT fk_jobtriggers_following_type FOREIGN KEY (following_type) REFERENCES system.tbl_jobtypes(type) ON UPDATE CASCADE ON DELETE RESTRICT;
';
if (!$db->db_query($qry))
echo '<strong>system.tbl_jobtriggers: '.$db->db_last_error().'</strong><br>';
else
echo '<br>system.tbl_jobtriggers table created';
$qry = 'GRANT SELECT ON TABLE system.tbl_jobtriggers TO web;';
if (!$db->db_query($qry))
echo '<strong>system.tbl_jobtriggers: '.$db->db_last_error().'</strong><br>';
else
echo '<br>Granted privileges to <strong>web</strong> on system.tbl_jobtriggers';
$qry = 'GRANT SELECT, UPDATE, INSERT, DELETE ON TABLE system.tbl_jobtriggers TO vilesci;';
if (!$db->db_query($qry))
echo '<strong>system.tbl_jobtriggers: '.$db->db_last_error().'</strong><br>';
else
echo '<br>Granted privileges to <strong>vilesci</strong> on system.tbl_jobtriggers';
}
// *** Pruefung und hinzufuegen der neuen Attribute und Tabellen
echo '<H2>Pruefe Tabellen und Attribute!</H2>';
@@ -4238,6 +4279,7 @@ $tabellen=array(
"system.tbl_filters" => array("filter_id","app","dataset_name","filter_kurzbz","person_id","description","sort","default_filter","filter","oe_kurzbz","statistik_kurzbz"),
"system.tbl_jobsqueue" => array("jobid", "type", "creationtime", "status", "input", "output", "starttime", "endtime", "insertvon", "insertamum"),
"system.tbl_jobstatuses" => array("status"),
"system.tbl_jobtriggers" => array("type", "status", "following_type"),
"system.tbl_jobtypes" => array("type", "description"),
"system.tbl_phrase" => array("phrase_id","app","phrase","insertamum","insertvon","category"),
"system.tbl_phrasentext" => array("phrasentext_id","phrase_id","sprache","orgeinheit_kurzbz","orgform_kurzbz","text","description","insertamum","insertvon"),