View in color | License | Download script | History |
30-Apr 16:14 UTC
[0.067] 27.365k
[0.067] 27.365k
rugby_server.rREBOL [
Title: "Rugby client and server"
Date: 28-May-2001/16:31:58+2:00
Version: 2.0.0.0
File: %rugby_server.r
Author: "Maarten Koopmans"
Needs: "Command 2.0+ , Core 2.5+ , View 1.1+"
Purpose: {A high-performance, handler based, server framework and a rebol request broker...}
Email: %m--koopmans2--chello--nl
library: [
level: 'advanced
platform: none
type: 'tool
domain: [ldc other-net tcp]
tested-under: none
support: none
license: none
see-also: none
]
]
; Our high performing server framework
; Used by the request broker rugby
hipe-serv: make object! [
;The list of ports we wait/all for in our main loop
port-q: make block! copy []
;mapping of ports to objects containg additional info
object-q: make block! copy []
;our main server port
server-port: none
;the handler for server [ currently only rugby, you can imagine http etc...]
my-handler: none
;restricted server list
restricted-server: make block! 20
;Server restrictions?
restrict: no
;Set our server restrictions
restrict-to: func [ r [any-block!]]
[
restrict: yes
append restricted-server r
]
;Is a connection to this ip address allowed?
allow?: func [ ip [tuple!]]
[
return found? find restricted-server ip
]
;remove a port from our port list
port-q-delete: func [ target [port!]]
[
remove find port-q target
]
;insert a port in to our port list
port-q-insert: func [ target [port!]]
[
append port-q target
;append port-q make object! conn-object [ port: :target ]
]
;insert a port and its corresponding object
object-q-insert: func [ target /local o]
[
append object-q target
o: make object! [port: target handler: :my-handler user-data: none]
append object-q o
]
;remove a port and its corresponding object
object-q-delete: func [target [port!] ]
[
remove remove find object-q target
]
;initialize everything for a client connection on application level
start: func [ conn [port!]]
[
port-q-insert conn
object-q-insert conn
]
;clean up after a client connection
stop: func [ conn [port!] /local conn-object]
[
port-q-delete conn
error? try
[
conn-object: select object-q conn
close conn-object/port
object-q-delete conn
]
]
;intialize everything on network level (asynchronous)
init-conn-port: func [ conn [port!] ]
[
either restrict
[
either allow? conn/remote-ip
[
start conn
return
]
[
close conn
return
]
]
;no restrictions
[
start conn
return
]
]
;initialize our main server port
init-server-port: func [ p [port!] conn-handler [any-function!]]
[
server-port: p
append port-q server-port
;Increase the backlog for this server. 15 should be possible (default is 5)
server-port/backlog: 15
my-handler: :conn-handler
open/direct/no-wait server-port
]
;Process all ports that have received an event (called from our main loop, serve)
process-ports: func [ {Process all ports that have events}
portz [block!] {The port list}
/local temp-obj]
[
foreach item portz
[
either (item = server-port)
[
init-conn-port first server-port
]
[
temp-obj: select object-q item
temp-obj/handler temp-obj
]
]
]
;Start serving. Do a blocking wait until there are events
serve: func [/local portz]
[
forever
[
portz: wait/all port-q
process-ports portz
]
]
]
;This object implements the server side of a request broker.
rugby-server: make hipe-serv [
;get the number of args of a function
nargs: func [
{The number of the function arguments}
f [any-function!]
] [
-1 + index? any [find first :f refinement! tail first :f]
]
;fill a string with zeros (used for message lengths etc.)
fill-0: func [ filly [string!] how-many [integer!] /local fills]
[
if how-many > length? filly
[
fills: how-many - length? filly
for filz 1 fills 1
[
insert filly "0"
]
]
return filly
]
;This varaiable is a block containg words that are allowed to be executed
exec-env: none
;Checks to see if a message's integrity is ok.
check-msg: func [ msg [any-block!]]
[
return ((checksum/secure mold second msg) = first msg)
]
;Create a message for on the wire transfer
compose-msg: func [msg [any-block!] /local f-msg chk ]
[
f-msg: copy []
;compute the checksum
chk: checksum/secure mold msg
insert/only f-msg copy msg
insert f-msg chk
;return the compressed message
return mold compress mold copy f-msg
]
;Extract a message that has been sent on the wire
decompose-msg: func [ msg [any-string!]]
[
return copy do decompress do msg
]
;Do a low-level write of a message
write-msg: func [msg dest [port!] /local length ]
[
;We try to write ate least 16000 bytes at a time
either 16000 > (length? msg)
[
length: write-io dest msg length? msg
;message written, we're done
either (length = (length? msg))
[
return true
]
;we're not done. Return what we have written
[
return length
]
];either 16000 > first clause
[
length: write-io dest msg 16000
]
;we're done, port is closed
if 0 > length [return true]
return length
]
;Execute a message. Only if the first word is in our exec-env varaiable
safe-exec: func [ statement [any-block!] env [any-block!] /local res n stm]
[
if found? (find env first statement)
[
n: nargs get to-get-word first statement
res: none
stm: copy/part statement (n + 1)
error? try [ res: do stm ]
return res
]
return copy {}
]
;High-level 'do' of a message
do-message: func [ msg [any-string!] /local f-msg res size-read]
[
f-msg: decompose-msg msg
either check-msg f-msg
[
res: safe-exec second f-msg exec-env
return res
]
[
return none
]
]
;This is the rugby server-handler (my-handler in hipe-serv)
do-handler: func [ o /local msg ret size size-read]
[
;this handler does its work in 3 parts
; 1) Read the message size
; 2) Read the message
; 3) do the message
; 4) Return the result
; 1) and 2) and 4) may be done inmultiple steps because of the saync I/O
;First, we expect 8 bytes and use user-data initially to store that
if (none? o/user-data)
[
o/user-data: copy {}
];if
;If we are not an object we are initialized to a string
if (not object? o/user-data )
[
error? try
[
;read the first 8 bytes that contain the total message size
;message size =< 99.999.999
size: copy {}
;size-read: length? msg: copy o/port
msg: copy/part o/port (8 - (length? o/user-data))
size-read: length? msg
either (size-read = ( 8 - (length? o/user-data)))
[
;What's the total size
size: copy o/user-data
append size copy/part msg (8 - (length? o/user-data))
remove/part msg (8 - (length? o/user-data))
if (0 < (length? msg)) [ size: (to-integer size) - length? msg ]
;And make an object of our user-data
o/user-data: context [ task: copy msg rest: to-integer size ret-val: copy {} msg-read: false
ret-val-written: false task-completed: false header-written: false
header-length: copy "0"
];context
]
[
o/user-data: append o/user-data msg
];either
unset 'size
];try
return
];if not object?
;Read the actual message
if (not o/user-data/msg-read)
[
;now try to read the rest of the message
if (error? try [ msg: copy {} size-read: length? msg: copy/part o/port o/user-data/rest ])
[ return]
if 0 = size-read [return]
o/user-data/task: append o/user-data/task msg
o/user-data/rest: (o/user-data/rest - size-read)
if (o/user-data/rest = 0) [ o/user-data/msg-read: true ]
return
]
;do our task and compose our return message
if not o/user-data/task-completed
[
ret: copy []
append ret do-message o/user-data/task
o/user-data/ret-val: compose-msg ret
o/user-data/header-length: fill-0 to-string length? o/user-data/ret-val 8
o/user-data/task-completed: true
]
;write out the header (length of what follows)
if not o/user-data/header-written
[
wr-res: write-msg o/user-data/header-length o/port
either logic? wr-res
[
o/user-data/header-written: true
]
[
remove/part o/user-data/header-length wr-res
];either
return
]
;write out our return message in batches
if not o/user-data/ret-val-written
[
wr-res: write-msg o/user-data/ret-val o/port
either logic? wr-res
[
o/user-data/ret-val-written: true
stop o/port
]
[
remove/part o/user-data/ret-val wr-res
];either
return
]
];do-handler
;Init our server according to our server port-spec and with rugby's do-handler
init-rugby: func [ port-spec [port!] x-env [any-block!]]
[
exec-env: copy x-env
init-server-port port-spec :do-handler
]
;Start serving
go: func []
[
serve
]
]
serve: func [ {Exposes a set of commands as a remote service} commands
{The commands to expose} [block!]
/with {Expose on a different port than tcp://:8001} p [port!] {The port spec}
/restrict {Restrict access to a block of ip numbers} r [block!] {The block of Ip numbers that have access to this service}]
[
if restrict
[
rugby-server/restrict-to r
]
either with
[
rugby-server/init-rugby p commands
]
[
rugby-server/init-rugby make port! tcp://:8001 commands
]
rugby-server/serve
]
echo: func [ e [string!]] [print enbase e return enbase e]
; Type in your console
;serve [echo]
;Rugby's client side
rugby-client: make object!
[
;list of ports for deferred requests
deferred-ports: make block! []
deferred-index: 0
;Again... fill with zeroes
fill-0: func [ filly [string!] how-many [integer!] /local fills]
[
if how-many > length? filly
[
fills: how-many - length? filly
for filz 1 fills 1
[
insert filly "0"
]
]
return filly
]
;Check for message integrity
check-msg: func [ msg [any-block!]]
[
return ((checksum/secure mold second msg) = first msg)
]
;Create a message for on the wire transmission
compose-msg: func [msg [any-block!] /local f-msg chk ]
[
f-msg: copy []
chk: checksum/secure mold msg
insert/only f-msg copy msg
insert f-msg chk
return mold compress mold copy f-msg
]
;Extract a message that has been transmitted on the wire
decompose-msg: func [ msg [any-string!]]
[
return copy do decompress do msg
]
;Write a message on the port
write-msg: func [msg [any-block!] dest [port!] /local length f-msg]
[
f-msg: compose-msg msg
length: fill-0 to-string length? f-msg 8
write-io dest length 8
write-io dest f-msg length? f-msg
write-io dest length 1
]
;Do a high-level rexec
rexec: func [ msg [any-block!] /with p [port!] /oneway /deferred
/local res dest]
[
either with
[
dest: p
]
[
;the default
dest: make port! tcp://localhost:8001
]
if error? try
[
open dest
write-msg msg dest
;Do we require to wait for the result
if not any [oneway deferred]
[
;Read the result. We throw the length (first 8 bytes away)
;because we just read everything (synchronously)
res: remove/part copy dest 8
close dest
res: decompose-msg res
either check-msg res
[
return copy second res
]
[
return none
]
return copy res
];if oneway deferred
if deferred
[
;Create a holder object
holder: make object!
[
size: copy {}
ret-val: copy {}
port: dest
]
deferred-index: deferred-index + 1
append deferred-ports deferred-index
append deferred-ports holder
return deferred-index
]
return true;
];try
[
return none
];if error? try
]
wait-for-result: func [ index [integer!] /local res temp-object]
[
temp-object: select deferred-ports index
;Hmmmm.... we don't exist, return silently. Need to add error throwing here
if (not object? temp-object)
[
return none
]
;Read the result. We throw the length (first 8 bytes away)
;because we just read everything (synchronously)
res: remove/part copy temp-object/port 8
close temp-object/port
;Remove the port holder and its index
remove remove find deferred-ports index
res: decompose-msg res
either check-msg res
[
return copy second res
]
[
return none
]
return copy res
];wait-for-result
poll-for-result: func [ index [integer!] /local msg ret o size-read size]
[
; 1) Read the message size
; 2) Read the message
; 3) Return the message
; 1) and 2) may be done inmultiple steps because of the async I/O
o: select deferred-ports index
;Hmmmm.... we don't exist, return silently. Need to add error throwing here
if (not object? o)
[
return none
]
if (8 > length? (o/size ))
[
error? try
[
;read the first 8 bytes that contain the total message size
;message size =< 99.999.999
size: copy {}
;size-read: length? msg: copy o/port
msg: copy/part o/port (8 - (length? o/size))
o/size: append o/size msg
];try
];if not object?
;Read the actual message
if ((length? o/size) = 8)
[
;now try to read the rest of the message
if (error? try [ msg: copy {} size-read: length? msg: copy/part o/port to-integer o/size ])
[ return none]
if 0 = size-read [return false]
o/ret-val: append o/ret-val msg
o/size: ((to-integer o/size) - size-read)
if (o/size = 0)
[
ret: decompose-msg o/ret-val
remove remove find deferred-ports index
either check-msg ret
[
return copy second ret
]
[
return none
]
]
];if
return false
];poll-result
];context
;Client test function. Shows how easy it is to do a remote exec
client-test: does [ rugby-client/rexec [echo "Rugby is great!"] ]
;A rexec wrapper in the global environment
;Feature suggestion by Allen Kamp
rexec: func [ {A remote exec like facility. Provides complete transparent remote messaging.}
msg [any-block!]
/with {Specify another port than the default of localhost:8001.}
p [port!] {The other port}
/oneway {Don't wait for a result. Returns true if the message was succefully delivered to the server.}
/deferred {Check back later for the result and return immediately once the message has been delivered to the service. Returns an ID that can be used when checking if the result has arrived.}
/local dest]
[
either with
[
dest: p
]
[
dest: make port! tcp://localhost:8001
]
if oneway
[
return rugby-client/rexec/with/oneway msg dest
]
if deferred
[
return rugby-client/rexec/with/deferred msg dest
]
return rugby-client/rexec/with msg dest
]
wait-for-result: func [ {Wait for the result to arrive}index {index of the result to wait for.}[integer!]]
[
rugby-client/wait-for-result index
]
poll-for-result: func [ {Poll if the result has arrived. Return false or the value (or none in case of an error).}index {the index to poll for.} [integer!]]
[
rugby-client/poll-for-result index
] Notes
|