2012年11月9日星期五

Job 相同 Unique Key 的处理

测试 1
<?php

$start_time = date('Y-m-d H:i:s');
echo "Start: " . $start_time . "\n";

$gmc= new GearmanClient();
$gmc->addServer();
$gmc->setStatusCallback("reverse_status");
$gmc->setCompleteCallback("reverse_complete");
$gmc->addTask("reverse", "Hi! - $start_time", null, 1);


if (! $gmc->runTasks()) {
    echo "ERROR " . $gmc->error() . "\n";
    exit;
}


function reverse_status($task) {

    echo "STATUS: " . $task->unique() . ", " .
    $task->jobHandle() . " - " .
    $task->taskNumerator() .  "/" .
    $task->taskDenominator() . "\n";
}

function reverse_complete($task) {
    echo "COMPLETE: " . $task->unique() . ", " . $task->data() . "\n";
}
?>
<?php
$gmworker= new GearmanWorker();
$gmworker->addServer();

$gmworker->addFunction("reverse", "reverse_fn");

$flag = true;

while($flag){
    $flag = $gmworker->work();
}


function reverse_fn($job) {
    echo "Received job: " . $job->handle() . "\n";
    $workload = $job->workload();

    $workload_size = $job->workloadSize();
    echo "Workload: $workload ($workload_size)\n";

    # This status loop is not needed, just showing how it works

    for ($x= 0; $x < $workload_size; $x++) {
        echo "Sending status: " . ($x + 1) . "/$workload_size complete\n";

        $job->sendStatus($x+1, $workload_size);
        $job->sendData(substr($workload, $x, 1));
        sleep(1);
    }


    $result= strrev($workload);

    echo "Result: $result\n";

    # Return what we want to send back to the client.

    return $result;
}
?>
开启三个终端,第一个终端运行 worker,另外两个终端运行 client:
[root@www test]# php w-unique.php 
Received job: H:www:641
Workload: Hi! - 2012-11-08 11:01:00 (25)
Sending status: 1/25 complete
Sending status: 2/25 complete
Sending status: 3/25 complete
Sending status: 4/25 complete
Sending status: 5/25 complete
Sending status: 6/25 complete
Sending status: 7/25 complete
Sending status: 8/25 complete
Sending status: 9/25 complete
Sending status: 10/25 complete
Sending status: 11/25 complete
Sending status: 12/25 complete
Sending status: 13/25 complete
Sending status: 14/25 complete
Sending status: 15/25 complete
Sending status: 16/25 complete
Sending status: 17/25 complete
Sending status: 18/25 complete
Sending status: 19/25 complete
Sending status: 20/25 complete
Sending status: 21/25 complete
Sending status: 22/25 complete
Sending status: 23/25 complete
Sending status: 24/25 complete
Sending status: 25/25 complete
Result: 00:10:11 80-11-2102 - !iH

[root@www test]# php c-unique.php 
Start: 2012-11-08 11:01:00
STATUS: 1, H:www:641 - 1/25
STATUS: 1, H:www:641 - 2/25
STATUS: 1, H:www:641 - 3/25
STATUS: 1, H:www:641 - 4/25
STATUS: 1, H:www:641 - 5/25
STATUS: 1, H:www:641 - 6/25
STATUS: 1, H:www:641 - 7/25
STATUS: 1, H:www:641 - 8/25
STATUS: 1, H:www:641 - 9/25
STATUS: 1, H:www:641 - 10/25
STATUS: 1, H:www:641 - 11/25
STATUS: 1, H:www:641 - 12/25
STATUS: 1, H:www:641 - 13/25
STATUS: 1, H:www:641 - 14/25
STATUS: 1, H:www:641 - 15/25
STATUS: 1, H:www:641 - 16/25
STATUS: 1, H:www:641 - 17/25
STATUS: 1, H:www:641 - 18/25
STATUS: 1, H:www:641 - 19/25
STATUS: 1, H:www:641 - 20/25
STATUS: 1, H:www:641 - 21/25
STATUS: 1, H:www:641 - 22/25
STATUS: 1, H:www:641 - 23/25
STATUS: 1, H:www:641 - 24/25
STATUS: 1, H:www:641 - 25/25
COMPLETE: 1, 00:10:11 80-11-2102 - !iH
[root@www test]# php c-unique.php
Start: 2012-11-08 11:01:02
STATUS: 1, H:www:641 - 3/25
STATUS: 1, H:www:641 - 4/25
STATUS: 1, H:www:641 - 5/25
STATUS: 1, H:www:641 - 6/25
STATUS: 1, H:www:641 - 7/25
STATUS: 1, H:www:641 - 8/25
STATUS: 1, H:www:641 - 9/25
STATUS: 1, H:www:641 - 10/25
STATUS: 1, H:www:641 - 11/25
STATUS: 1, H:www:641 - 12/25
STATUS: 1, H:www:641 - 13/25
STATUS: 1, H:www:641 - 14/25
STATUS: 1, H:www:641 - 15/25
STATUS: 1, H:www:641 - 16/25
STATUS: 1, H:www:641 - 17/25
STATUS: 1, H:www:641 - 18/25
STATUS: 1, H:www:641 - 19/25
STATUS: 1, H:www:641 - 20/25
STATUS: 1, H:www:641 - 21/25
STATUS: 1, H:www:641 - 22/25
STATUS: 1, H:www:641 - 23/25
STATUS: 1, H:www:641 - 24/25
STATUS: 1, H:www:641 - 25/25
COMPLETE: 1, 00:10:11 80-11-2102 - !iH
上面是三个终端运行的结果。terminal-2, terminal-3 提交 Job 到 Gearmand Server,terminal-1 做为 worker,执行提交到 Geramand Server 所有为 reverse 的 Job。terminal-2, terminal-3 开始时间不同,但结束时间相同。 Unique ID 相同,先前提交的 Job 没有执行完时,Gearmand 会将同 Unique Key 的 Job 合并,即它们会同时收到相同的执行情况信息:
  1. terminal-2 提交 Job A , worker 接到调度信息立即对 Job A 进行处理。
  2. 在 termial-3 提交相同 Unique Key 的 Job B,两者提交的时差为 2s。
  3. 因为 reverse worker 的运行时间 >= 25s ,所以 Job B 提交时 Job A 实际还处于执行,Gearmand Server 将 Job B 合到 Job A 一起。
  4. terminal-3 收到这时和 terminal-2 相同的 Job Status。
简单理解的话,Gearmand Server 会将 Unique Key 做为每一个 Job 的标识,当提交的 Job 的 Unique Key 相同时,服务器将其第 N (N > 1)个提交的 Job 视为重复提交,直接忽略,不对其进行任何处理。这些逻辑都只针对于当前在队列中的 Job,已经执行完毕的 Job 不会对其进行这样的处理。
测试 2
<?php
$gmc= new GearmanClient();
$gmc->addServer();

for ($i = 0; $i < 11; $i++) {
    $gmc->doBackground("reverse", "Hi! - $i", 1);
}

for ($i = 0; $i < 11; $i++) {
    $gmc->addTaskBackground("reverse", "Hi! - $i", null, 2);
}


if (! $gmc->runTasks()) {
    echo "ERROR " . $gmc->error() . "\n";
    exit;
}
?>
修改 Client,将提交的 Jobs/Tasks 改为是后台处理。假设 Gearmand Server 采用了 MySQL 的持久存储,不启动 reverse worker,只运行上述的 Client 脚本,查询数据库:
mysql> SELECT * FROM gearman_queue;
+------------+---------------+----------+----------+-------------+
| unique_key | function_name | priority | data     | when_to_run |
+------------+---------------+----------+----------+-------------+
| 1          | reverse       |        1 | Hi! - 0  |           0 |
| 2          | reverse       |        1 | Hi! - 10 |           0 |
+------------+---------------+----------+----------+-------------+
2 rows in set (0.04 sec)
从上面得知,尽管 Unqiue Key 为 12 的都提交了 10 次,而队列中对应的记录却都只有一条。也就说明,当 Unique Key 相同时,Gearmand Server 对提交的 Job/Tasks 做了合并处理。
说明:
  • 当然,从Gearmand 队列持久化其表结构中可知,unique_key 做为主键,必须是唯一的,而该值对应 Job 提交时的 Unqiue Key。即在 Unique Key 相同时,在数据库中,记录必定无法添加,即上述 例 1 所要说明的原因 (这里实际上还有点疑问, MySQL/Drizzle 和 sqlite3 的表结构竟然不同,MySQL/Drizzle 表结构竟然是在 unique_key 和 function_name 上做唯一的索引,而 unqiue_key 也没做主键,难道实际中有同 unique_key 而不同 function_name 这样的用法?而且从 MySQL 性能方面来讲,不设置主键似乎也说不过去;如果 engine 采用 InnoDB,内部还是会给建一主键。)。
    CREATE TABLE gearman_queue (    
    unique_key VARCHAR(64),    
    function_name VARCHAR(255),    
    priority INT,    
    data LONGBLOB,    
    when_to_run INT,    
    unique key (unique_key, function_name))    
    
    CREATE TABLE gearman_queue (
    unique_key TEXT PRIMARY KEY,
    function_name TEXT,
    priority INTEGER,
    data BLOB,
    when_to_run INTEGER)
    
  • 在同一个进程里,addTask/addTaskBackground 提交 Tasks 时,因为都是在调用 $gmc->runTasks() 后,所有这个进程里需提交的 Tasks 才提交到 Gearmand Server。而在提交时,如果所有 Tasks 的 Unique Key 相同,后面 Task 调用的参数数据会覆盖前面调用的。 所以,也就有上述查询中 unique_key 2 时处理的数据是 Hi! - 10

没有评论:

发表评论