View in color | License | Download script | History |
9-Oct 22:08 UTC
[0.069] 19.26k
[0.069] 19.26k
rugby.rREBOL [
Title: "Asynchronous Request Broker"
Date: 21-May-2001/16:31:58+2:00
Version: 1.0.0
File: %rugby.r
Author: "Maarten Koopmans"
Needs: "Command 2.0+ , Core 2.5+ , View 1.1+"
Purpose: {An asynchronous, high-performance, handler based, server framework and a rebol request broker...}
Email: %m--koopmans2--chello--nl
library: [
level: 'advanced
platform: none
type: 'tool
domain: [ldc other-net tcp]
tested-under: none
support: none
license: none
see-also: none
]
]
; Our high performing server framework
; Used by the request broker rugby
hipe-serv: context [
;The list of ports we wait/all for in our main loop
port-q: make block! copy []
;mapping of ports to objects containg additional info
object-q: make block! copy []
;our main server port
server-port: none
;the handler for server [ currently only rugby, you can imagine http etc...]
my-handler: none
;remove a port from our port list
port-q-delete: func [ target [port!]]
[
remove find port-q target
]
;insert a port in to our port list
port-q-insert: func [ target [port!]]
[
append port-q target
;append port-q make object! conn-object [ port: :target ]
]
;insert a port and its corresponding object
object-q-insert: func [ target /local o]
[
append object-q target
o: make object! [port: target handler: :my-handler user-data: none]
append object-q o
]
;remove a port and its corresponding object
object-q-delete: func [target [port!] ]
[
remove remove find object-q target
]
;initialize everything for a client connection on application level
start: func [ conn [port!]]
[
port-q-insert conn
object-q-insert conn
]
;clean up after a client connection
stop: func [ conn [port!] /local conn-object]
[
port-q-delete conn
error? try
[
conn-object: select object-q conn
close conn-object/port
object-q-delete conn
]
]
;intialize everything on network level (asynchronous)
init-conn-port: func [ conn [port!] ]
[
set-modes conn [ async-modes: [read write]]
start conn
]
;initialize our main server port
init-server-port: func [ p [port!] conn-handler [any-function!]]
[
server-port: p
append port-q server-port
;Increase the backlog for this server. 15 should be possible (default is 5)
;server-port/backlog: 15
my-handler: :conn-handler
open/direct server-port
]
;Process all ports that have received an event (called from our main loop, serve)
process-ports: func [ portz [block!] /local temp-obj]
[
foreach item portz
[
either (item = server-port)
[
init-conn-port first server-port
]
[
temp-obj: select object-q item
temp-obj/handler temp-obj
]
]
]
;Start serving. Do a blocking wait until there are events
serve: func [/local portz]
[
forever
[
portz: wait/all port-q
process-ports portz
]
]
]
;This object implements the server side of a request broker.
rugby-server: make hipe-serv [
;get the number of args of a function
nargs: func [
{The number of the function arguments}
f [any-function!]
] [
-1 + index? any [find first :f refinement! tail first :f]
]
;fill a string with zeros (used for message lengths etc.)
fill-0: func [ filly [string!] how-many [integer!] /local fills]
[
if how-many > length? filly
[
fills: how-many - length? filly
]
for filz 1 fills 1
[
insert filly "0"
]
return filly
]
;This varaiable is a block containg words that are allowed to be executed
exec-env: none
;Checks to see if a message's integrity is ok.
check-msg: func [ msg [any-block!]]
[
return ((checksum/secure mold second msg) = first msg)
]
;Create a message for on the wire transfer
compose-msg: func [msg [any-block!] /local f-msg chk ]
[
f-msg: copy []
;compute the checksum
chk: checksum/secure mold msg
insert/only f-msg copy msg
insert f-msg chk
;return the compressed message
return mold compress mold copy f-msg
]
;Extract a message that has been sent on the wire
decompose-msg: func [ msg [any-string!]]
[
return copy do decompress do msg
]
;Do a low-level write of a message
write-msg: func [msg [any-block!] dest [port!] /local length f-msg]
[
f-msg: compose-msg msg
length: fill-0 to-string length? f-msg 4
write-io dest length 4
write-io dest f-msg length? f-msg
]
;Execute a message. Only if the first word is in our exec-env varaiable
safe-exec: func [ statement [any-block!] env [any-block!] /local res n stm]
[
if found? (find env first statement)
[
n: nargs get to-get-word first statement
res: none
stm: copy/part statement (n + 1)
error? try [ res: do stm ]
return res
]
return copy {}
]
;High-level 'do' of a message
do-message: func [ msg [any-string!] /local f-msg res]
[
f-msg: decompose-msg msg
either check-msg f-msg
[
res: safe-exec second f-msg exec-env
return res
]
[
return none
]
]
;This is the rugby server-handler (my-handler in hipe-serv)
do-handler: func [ o /local msg ret size]
[
;this handler does its work in 3 parts
; 1) Read the message size
; 2) Read the message
; 3) do the message
; 1) and 2) may be done inmultiple steps because of the saync I/O
ret: copy []
if (none? o/user-data )
[ error? try
[
;read the first 4 bytes that contain the total message size
;message size =< 9999
size: copy {}
read-io o/port size 4
o/user-data: context [ task: copy {} rest: to-integer size]
unset 'size
]
return
]
;now try to read the rest of the message
either (error? try [ msg: copy {} read-io o/port msg to-integer
o/user-data/rest ])
[ return]
;Do what ever we asked
[
o/user-data/rest: o/user-data/rest - (length? msg)
either (o/user-data/rest = 0)
[
o/user-data/task: append o/user-data/task msg
append ret do-message o/user-data/task
write-msg ret o/port
stop o/port
return
]
[
o/user-data/task: append o/user-data/task msg
return
]
]
]
;Init our server according to our server port-spec and with rugby's do-handler
init-rugby: func [ port-spec [port!] x-env [any-block!]]
[
exec-env: copy x-env
init-server-port port-spec :do-handler
]
;Start serving
go: func []
[
serve
]
]
;Rugby's client side
rugby-client: context
[
;Again... fill with zeroes
fill-0: func [ filly [string!] how-many [integer!] /local fills]
[
if how-many > length? filly
[
fills: how-many - length? filly
]
for filz 1 fills 1
[
insert filly "0"
]
return filly
]
;Check for message integrity
check-msg: func [ msg [any-block!]]
[
return ((checksum/secure mold second msg) = first msg)
]
;Create a message for on the wire transmission
compose-msg: func [msg [any-block!] /local f-msg chk ]
[
f-msg: copy []
chk: checksum/secure mold msg
insert/only f-msg copy msg
insert f-msg chk
return mold compress mold copy f-msg
]
;Extract a message that has been transmitted on the wire
decompose-msg: func [ msg [any-string!]]
[
return copy do decompress do msg
]
;Write a message on the port
write-msg: func [msg [any-block!] dest [port!] /local length f-msg]
[
f-msg: compose-msg msg
length: fill-0 to-string length? f-msg 4
write-io dest length 4
write-io dest f-msg length? f-msg
]
;Do a high-level rexec
rexec: func [ msg [any-block!] /with p [port!] /local res dest]
[
either with
[
dest: p
]
[
;the default
dest: make port! tcp://localhost:8001
]
if error? e: try
[
open dest
write-msg msg dest
;Read the result. We throw the length (first 4 bytes away)
;because we just read everything (synchronously)
res: remove/part copy dest 4
close dest
res: decompose-msg res
either check-msg res
[
return copy second res
]
[
return none
]
return copy res
];try
[
return none
];if error? try
]
];context
;Client test function. Shows how easy it is to do a remote exec
client-test: does [ rugby-client/rexec [echo "Rugby is great!"] ]
;Sample echo function for the test-server
echo: func [a] [return enbase a]
;A server test function. Demonstrates how easy it is to use rugby-server.
server-test: does
[
rugby-server/init-rugby make port! tcp://:8001 [echo]
rugby-server/go
] Notes
|