Skip to content

Commit b5b7a27

Browse files
committed
Multithreading implementation
1 parent 121493d commit b5b7a27

File tree

11 files changed

+319
-0
lines changed

11 files changed

+319
-0
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
'use strict';
2+
3+
const ActorSystem = require('../system');
4+
const nodemailer = require('nodemailer');
5+
const auth = require('../config');
6+
7+
const FROM = 'nodeua.com@gmail.com';
8+
9+
ActorSystem.register(class Mailer {
10+
constructor() {
11+
console.log('Start actor: Mailer');
12+
this.transport = nodemailer.createTransport({
13+
service: 'gmail', auth
14+
});
15+
}
16+
17+
message({ to, subject, message }) {
18+
const mail = { from: FROM, to, subject, text: message };
19+
this.transport.sendMail(mail, (error, data) => {
20+
if (error) console.log(error);
21+
else console.log(`Email sent: ${data.response}`);
22+
});
23+
}
24+
25+
exit() {
26+
this.transport.close();
27+
console.log('Stop actor: Mailer');
28+
}
29+
});
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
'use strict';
2+
3+
const ActorSystem = require('../system');
4+
const http = require('http');
5+
6+
const URL = 'http://localhost:8000/';
7+
const INTERVAL = 2000;
8+
9+
ActorSystem.register(class Monitoring {
10+
constructor() {
11+
console.log('Start actor: Monitoring');
12+
this.prevSuccess = true;
13+
this.timer = setInterval(() => {
14+
this.attempt(URL);
15+
}, INTERVAL);
16+
}
17+
18+
attempt(url) {
19+
http.get(url, res => {
20+
const success = res.statusCode === 200;
21+
this.notify({ url, success, status: res.statusCode });
22+
}).on('error', error => {
23+
this.notify({ url, success: false, status: error.message });
24+
});
25+
}
26+
27+
notify({ url, success, status }) {
28+
if (this.prevSuccess !== success) {
29+
this.prevSuccess = success;
30+
ActorSystem.send('Renderer', { url, success, status });
31+
}
32+
}
33+
34+
message() {}
35+
36+
exit() {
37+
clearInterval(this.timer);
38+
console.log('Stop actor: Monitoring');
39+
}
40+
});
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
'use strict';
2+
3+
const ActorSystem = require('../system');
4+
5+
ActorSystem.register(class Renderer {
6+
constructor() {
7+
console.log('Start actor: Renderer');
8+
}
9+
10+
message({ url, success, status }) {
11+
const to = 'nodeua.com@gmail.com';
12+
const msg = success ? 'is available again' : 'is not available';
13+
const date = new Date().toUTCString();
14+
const reason = (success ? 'Status code: ' : 'Error code: ') + status;
15+
const message = `Resource ${url} ${msg} (${date})\n${reason}`;
16+
const subject = 'Server Monitoring';
17+
ActorSystem.send('Mailer', { to, subject, message });
18+
}
19+
20+
exit() {
21+
console.log('Stop actor: Renderer');
22+
}
23+
});
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
'use strict';
2+
3+
const ActorSystem = require('../system');
4+
5+
ActorSystem.register(class Root {
6+
constructor() {
7+
console.log('Start actor: Root');
8+
ActorSystem.start('Monitoring');
9+
ActorSystem.start('Renderer');
10+
ActorSystem.start('Mailer', 3);
11+
}
12+
13+
message() {}
14+
15+
exit() {
16+
ActorSystem.stop('Monitoring');
17+
ActorSystem.stop('Renderer');
18+
ActorSystem.stop('Mailer');
19+
console.log('Stop actor: Root');
20+
}
21+
});

JavaScript/3-multithread/config.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
'use strict';
2+
3+
module.exports = {
4+
user: 'account-name@gmail.com',
5+
pass: 'your-password',
6+
};

JavaScript/3-multithread/main.js

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
'use strict';
2+
3+
const ActorSystem = require('./system.js');
4+
5+
const EXIT_NORMAL = 1000;
6+
const EXIT_ABNORMAL = 5000;
7+
8+
ActorSystem.start('Root');
9+
10+
process.on('SIGINT', () => {
11+
console.log('');
12+
ActorSystem.stop('Root');
13+
setTimeout(() => {
14+
console.log('Graceful shutdown');
15+
process.exit(0);
16+
}, EXIT_NORMAL);
17+
setTimeout(() => {
18+
console.log('Abnormal termination');
19+
process.exit(1);
20+
}, EXIT_ABNORMAL);
21+
});

JavaScript/3-multithread/master.js

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
'use strict';
2+
3+
const threads = require('worker_threads');
4+
const { Worker } = threads;
5+
6+
const actors = new Map();
7+
8+
class MasterSystem {
9+
static start(name, count = 1) {
10+
if (!actors.has(name)) {
11+
const ready = [];
12+
const instances = [];
13+
const queue = [];
14+
actors.set(name, { ready, instances, queue });
15+
}
16+
const { ready, instances } = actors.get(name);
17+
for (let i = 0; i < count; i++) {
18+
const actor = new Worker('./system.js');
19+
console.log({ actor });
20+
MasterSystem.subscribe(actor);
21+
ready.push(actor);
22+
instances.push(actor);
23+
actor.postMessage({ command: 'start', name });
24+
}
25+
}
26+
27+
static stop(name) {
28+
const record = actors.get(name);
29+
if (record) {
30+
const { instances } = record;
31+
for (const actor of instances) {
32+
actor.postMessage({ command: 'stop' });
33+
}
34+
}
35+
}
36+
37+
static send(name, data) {
38+
const record = actors.get(name);
39+
if (record) {
40+
const { ready, queue } = record;
41+
const actor = ready.shift();
42+
if (!actor) {
43+
queue.push(data);
44+
return;
45+
}
46+
actor.postMessage({ command: 'message', data });
47+
}
48+
}
49+
50+
static subscribe(actor) {
51+
actor.on('message', message => {
52+
const { command, name } = message;
53+
if (command === 'message') {
54+
const { data } = message;
55+
MasterSystem.send(name, data);
56+
return;
57+
}
58+
if (command === 'start') {
59+
const { count } = message;
60+
MasterSystem.start(name, count);
61+
return;
62+
}
63+
if (command === 'stop') {
64+
MasterSystem.stop(name);
65+
return;
66+
}
67+
if (command === 'ready') {
68+
const { id } = message;
69+
const record = actors.get(name);
70+
if (record) {
71+
const { ready, instances, queue } = record;
72+
for (const actor of instances) {
73+
if (actor.id === id) ready.push(actor);
74+
}
75+
if (queue.length > 0) {
76+
const next = queue.shift();
77+
MasterSystem.send(name, next);
78+
}
79+
}
80+
}
81+
});
82+
}
83+
}
84+
85+
module.exports = MasterSystem;

JavaScript/3-multithread/package-lock.json

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

JavaScript/3-multithread/package.json

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"name": "server-monitoring",
3+
"version": "1.0.0",
4+
"description": "Actor Model example application",
5+
"main": "main.js",
6+
"repository": {
7+
"type": "git",
8+
"url": "git+https://github.com/HowProgrammingWorks/ActorModel.git"
9+
},
10+
"author": "Timur Shemsedinov <timur.shemsedinov@gmail.com>",
11+
"license": "MIT",
12+
"bugs": {
13+
"url": "https://github.com/HowProgrammingWorks/ActorModel/issues"
14+
},
15+
"homepage": "https://github.com/HowProgrammingWorks/ActorModel#readme",
16+
"dependencies": {
17+
"nodemailer": "^6.1.1"
18+
}
19+
}

JavaScript/3-multithread/system.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
'use strict';
2+
3+
const threads = require('worker_threads');
4+
const { isMainThread } = threads;
5+
6+
module.exports = isMainThread ? require('./master') : require('./worker');

JavaScript/3-multithread/worker.js

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
'use strict';
2+
3+
const threads = require('worker_threads');
4+
const master = threads.parentPort;
5+
6+
class ActorSystem {
7+
static register(actor) {
8+
ActorSystem.actor = actor;
9+
}
10+
11+
static start(name, count = 1) {
12+
master.postMessage({ command: 'start', name, count });
13+
}
14+
15+
static stop(name) {
16+
master.postMessage({ command: 'stop', name });
17+
}
18+
19+
static send(name, data) {
20+
master.postMessage({ command: 'message', name, data });
21+
}
22+
}
23+
24+
ActorSystem.actor = null;
25+
ActorSystem.instance = null;
26+
27+
//process.on('SIGINT', () => {});
28+
29+
master.on('message', message => {
30+
const { command } = message;
31+
if (command === 'start') {
32+
const { name } = message;
33+
require(`./actors/${name.toLowerCase()}.js`);
34+
const ActorClass = ActorSystem.actor;
35+
ActorSystem.instance = new ActorClass();
36+
return;
37+
}
38+
if (command === 'stop') {
39+
const { instance } = ActorSystem;
40+
if (instance) instance.exit();
41+
process.exit(0);
42+
return;
43+
}
44+
if (command === 'message') {
45+
const { instance } = ActorSystem;
46+
if (instance) {
47+
const { data } = message;
48+
const { name } = ActorSystem.actor;
49+
instance.message(data);
50+
const msg = { command: 'ready', name, id: threads.threadId };
51+
master.postMessage(msg);
52+
}
53+
}
54+
});
55+
56+
module.exports = ActorSystem;

0 commit comments

Comments
 (0)