Perl: Parallel Processing - by Eun Bae Kim (08/26/2018)
 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
  
use strict;
use threads;
use Thread::Queue;
use threads::shared;


# Create queue
my $oQue = Thread::Queue->new;

# Thread Count
my $iThreadMax = 5;
my $iTargetCnt;

my @aTaskList = ("aaa", "bbb", "ccc", "ddd", "eee", "fff", 
                 "ggg", "hhh", "iii", "jjj", "kkk", "lll", "mmm", 
                 "nnn", "ooo", "ppp", "qqq", "rrr", "sss", 
                 "ttt", "uuu", "vvv", "www", "xxx", "yyy", "zzz",
                 "nan", "obo", "ppc", "dqq", "err", "fss", 
                 "tuv", "uuu", "gvv", "wxy", "xxx", "yyy", "zzz",
                 "nnn", "opq", "ppp", "qqq", "rrr", "sss", 
                 "ttt", "uuu", "vvv", "www", "xyz", "yyy", "last",
				 );

# Data sharing between threads. Only single level is allowed for hashes.
my %hTest = (); share(%hTest);        # to share data with other threads.
my $sTest = ""; share($sTest);        # to share data with other threads.

# Adding Tasks to a Que
print "--------------------------------------------\n";
print "Adding Tasks to a Que......\n";
foreach my $sCurTask (@aTaskList) {
	$iTargetCnt++;
	print "Target enrolled in a que: ".$sCurTask."\n";
	$oQue->enqueue ($sCurTask);
}

if ($iTargetCnt < $iThreadMax ) {
	$iThreadMax = $iTargetCnt;
}
print "--------------------------------------------\n";
print "Total No. of Threads: $iThreadMax\n";
print "--------------------------------------------\n";



# Generate Threads and make them do something independently.
for (my $i=0; $i<$iThreadMax; $i++) {
	print "New Thread Count: ".$i." --> Generated!!\n";
	$oQue -> enqueue(undef);

	# Main function that you like to handle the tasks after task split
	threads->new(\&funcDoIt, $i, "b", "c");  # You can send additional parameters
}

# Join up with the thread for normal finalization
my @aObjTaskNames = threads->list();
foreach my $oCurTask (@aObjTaskNames) {
	print "Joining Task: ".$oCurTask."\n";
	$oCurTask->join();
}


sub funcDoIt {
	print "Function Started...............................\n";
	my $iPara1 = shift;
	my $iPara2 = shift;
	my $iPara3 = shift;
	print $iPara1."\n";
	print $iPara2."\n";
	print $iPara3."\n";

	my $sStr = "abcdeeefghijkkkklmnopqrstuuuuvwxyzzzz";
	while (my $sCurTask = $oQue->dequeue()) {
		if ($sStr =~/$sCurTask/) {
			print $iPara1."th Thread\t".$sCurTask.": Found!!!\n";
		} else {
			print $iPara1."th Thread\t".$sCurTask.": Not Found!!!\n";
		}
		$hTest{$sCurTask} = $sCurTask;
		$sTest = $sCurTask;

		threads->yield();  # Give other threads a chance to run
	}
}


print "--------------------------------------------\n";
foreach my $sCurTask (sort(keys %hTest)) {
	print "Task: ".$sCurTask."\n";
}
print "Last Task: ".$sTest."\n";
print "End\n";

# Refer to the following link for more information.
# https://perldoc.perl.org/Thread/Queue.html
# https://perldoc.perl.org/threads/shared.html
# https://perldoc.perl.org/5.8.8/threads/shared.html