Cloud Haskell API


... [ Programmiersprachen und Sprachsysteme ] ... [ Cloud Haskell ] ... [ << Cloud Haskell Architektur ] ... [ Serialisierung >> ] ...

Basis


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
instance Monad Process
instance MonadIO Process

-- | Process identifier
data ProcessId = ProcessId { ... }

-- | Node identifier
data NodeId = NodeId { ... }

-- | Objects that can be sent across the network
class (Binary a, Typeable a) => Serializable a


Die Basis der Cloud Haskell API stellt die Process Monade dar. Alle verteilten Tätigkeiten finden in dieser Monade statt und wie Zeile 2 zeigt, stellt sie eine Instanz der IO Monade dar, wodurch sich jegliche IO Operationen auch verteilt durchführen lassen. Diese IO Operationen lassen sich dann etwa durch die Funktion liftIO als Process monadische Operation liften. Ein Prozess wird durch seinen ProcessId eindeutig identifiziert und eine Node durch die NodeId. 'Serializable a' stellt dabei die Basis für alle Typen dar, welche für die verteilte Kommunikation verwendet werden sollen. Diese müssen also ebenfalls Instanzen der Typklassen Typeable und Binary sein.


Process Managment


Die Basis für das Prozess Managment stellen die folgenden Funktionen dar:

1
2
3
4
5
6
7
8
9
spawn :: NodeId -> Closure (Process ()) -> Process ProcessId

newLocalNode :: Transport -> RemoteTable -> IO ()

terminate :: Process a

getSelfPid :: Process ProcessId

getSelfNode :: Process NodeId


Durch die Funktion spawn lässt sich synchron ein neuer Prozess durch explizite Platzierung auf einer bestimmten Node (NodeId) spawnen und mit der Ausführung einer Funktion (Closure) beauftragen. Solch eine Node lässt sich etwa durch die Funktion newLocalNode vornehmen, welche für einen bestimmten Transport Layer und einer sogenannten RemoteTable eine neue Node bereitstellt. Eine RemoteTable definiert dabei die öffentliche Schnittstelle zu einem Prozess.


Message Passing


Die Interprozesskommunikation in Cloud Haskell erfolgt durch Message Passing, welche durch folgende Basisfunktionen ermöglicht wird:

1
2
3
4
5
-- | Send a message
send :: Serializable a => ProcessId -> a -> Process ()

-- | Wait for a message of a specific type
expect :: Serializable a => Process a


Das Versenden von Nachrichten erfolgt über die send Funktion, welches den Empfänger in Form seiner ProcessId und einer Nachricht entgegen nimmt. Wie bereits zuvor beschrieben, muss es sich bei dem Typ dieser Nachricht um eine Instanz der Typklasse Serializable handeln. Auf Grund dessen, dass die Serialisierung durch Typklassen dem jeweiligen Entwickler überlassen wird, sind einige Daten bewusst nicht serialisierbar. Solche Daten wären z. B. Daten wie etwa ein File Handle oder aber MVar's. Der Versand einer Nachricht ist dabei verlässlich und stets geordnet. Das heißt, dass wenn Prozess A an Prozess B z. B. die Nachrichten [m1, m2, m3] versendet, erhält Prozess B in jedem Fall entweder alle Nachrichten in geordneter Reihenfolge oder aber einen beliebigen Prefix dieser. Um die lose Koppelung zwischen Actors zu garantieren, wird das Senden einer Nachricht niemals fehlschlagen. Der Aufruf kann jedoch blockieren, falls z. B. der Buffer bei einem TCP Verbindungsaufbau das Maximum an Kapazität erreicht hat. Die Rückkehr nach solch einer Blockierung impliziert also keine erfolgreiche Zustellung, wie fälschlicher Weise angenommen werden könnte. Stattdessen muss sich anderer Strategien bedient werden, um eine erfolgreiche Bestätigung zu erhalten. Eine Strategie wäre z. B. wie folgt:

1
2
send pid message
ack <- expect


Nach dem Versenden einer Nachricht wird auf eine Bestätigungs-Nachricht über die Methode expect gewartet. expect liefert dabei die vom Typ erstmögliche Nachricht aus der Message Queue und belässt den Rest der Nachrichten in der Queue. Der Typ der passenden Nachricht wird dabei über den jeweiligen Kontext inferiert. Die Message Queue kann dabei jegliche Art von Typ enthalten, welche eine Instanz der Typklasse Serializable darstellt.

Im Folgenden werden die bisher vorgestellten Konzepte anhand eines kurzen Beispiels verdeutlicht:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
...

data PingPongMessage = Ping ProcessId
                     | Pong ProcessId
                     deriving (Generic, Typeable)

$( derive makeBinary ''PingPongMessage )

pingServer :: Process ()
pingServer = forever $ do
    Ping from <- expect
    liftIO . putStrLn $ "Got ping from: " ++ show from
    self <- getSelfPid
    send from (Pong self)

pingClient :: ProcessId -> Process ()
pingClient serverPid = do
    self <- getSelfPid
    send serverPid (Ping self)
    Pong from <- expect
    liftIO . putStrLn $ "Got pong from: " ++ show from
    liftIO $ threadDelay (1*1000000)
    pingClient serverPid

pingClientBad :: ProcessId -> Process ()
pingClientBad serverPid = do
    self <- getSelfPid
    
    send serverPid (Pong self)

    liftIO . putStrLn $ "Got pong from: " ++ show serverPid
    liftIO $ threadDelay (1*1000000)
    pingClientBad serverPid

remotable ['pingServer, 'pingClient, 'pingClientBad]

master :: Backend -> [NodeId] -> Process ()
master backend slaves = do
    node <- getSelfNode

    serverPid <- spawn node $ staticClosure $(mkStatic 'pingServer)

    forM_ slaves $ \slave -> spawn slave $ $(mkClosure 'pingClient) (serverPid)
    
    liftIO $ threadDelay (1*1000000)

    let badSlave = head slaves
    _ <- spawn badSlave $ $(mkClosure 'pingClientBad) (serverPid)

    _ <- liftIO getLine
    terminateAllSlaves backend

configSimpleLocalnetBackend :: String -> String -> IO Backend
configSimpleLocalnetBackend host port = initializeBackend host port $ __remoteTable initRemoteTable

main :: IO ()
main = do
    let host = "localhost"
    args <- getArgs
    case args of
        ["master", port] -> do
            backend <- configSimpleLocalnetBackend host port
            startMaster backend $ master backend  
        ["slave", port] -> do
            putStrLn $ "Starting slave on port: " ++ port
            backend <- configSimpleLocalnetBackend host port
            startSlave backend
        other -> do
            putStrLn $ "Unkown command: " ++ show other


Für das Versenden von Nachrichten wird ein Datentyp PingPongMessage eingeführt (Zeile 3). Es gibt dabei verschiedene Möglichkeiten diesen Datentyp für die Serialisierung kompatibel zu machen. Einer dieser Wege wäre etwa durch die Template Funktion derive (Zeile 7). Wie in der Main Methode zu sehen, wird durch die vereinfachte Master/Slave Konfiguration des SimpleLocalNet Backends die Möglichkeit geboten, einen Master (Zeile 61) und mehrere Slaves (Zeile 64) zu starten. Die Konfiguration des Backends erfolgt dabei durch initializeBackend (Zeile 54), welches den Hostname, den Port und die Remote-Tabelle erwartet. Der Master erhält dabei automatisch die PID's aller bereits gestarteten Slaves in Form einer Liste, woraufhin er die Slave Prozesse auf den entsprechenden Node's spawnt (Zeile 43) und die Funktion pingClient ausführt. Wie hier außerdem zu sehen ist, wurde Template Haskell verwendet um über die Funktion mkClosure die Funktion pingClient verteilt in einem anderen Prozess ausführbar zu machen. An späterer Stelle wird auf den Ablauf der Serialisierung noch genauer eingegangen. Der Ping Client sendet dem Server eine Ping Nachricht (Zeile 19) und wartet dann auf die passende Antwort durch eine Pong Nachricht vom Server (Zeile 20). Der Zustand eines Prozesses wird, wie auch in Erlang, durch den Funktionsparameter für die nächsten Funktionsdurchläufe durchgereicht (Zeile 23). Dieses Verfahren ist üblich, da kein globaler Zustand für den Prozess besteht. Außerdem sollte auf Grund der potentiellen Langlebigkeit des Prozesses bei der Entwicklung stets endrekursiv entwickelt werden, um den sonst linear zur rekursionstiefe steigendem Speicherverbrauch zu vermeiden.
Der Ping-Server wird hingegen auf der eigenen Node gestartet (Zeile 41) und wartet auf eine entsprechende Ping Nachricht (Zeile 11), welche er dann mit einer Pong Nachricht (Zeile 14) beantwortet.
Ein spezieller Fall ist in Zeile 29 zu beobachten. Auf Grund des Summendatantyps PingPongMessage ist es an dieser Stelle möglich dem Server eine Pong Nachricht zu senden und diesen nach jetziger Implementierung zu einem Absturz zu führen. Dieses erfolgt dabei aus der Tatsache, dass die expect Funktion eine Nachricht aus der Message Queue entnimmt, welche dem Datentyp PingPongMessage entspricht und versucht dies dann entsprechend dem Ping zu zuordnen, welches jedoch zu einem Laufzeitfehler führt. An dieser Stelle führt also ein Summendatentyp nicht zu dem erwarteten Ergebnis und dieser müsste jeweils in zwei unterschiedliche Datentypen aufgeteilt werden. Eine andere Alternative wäre dabei z. B. die folgende:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
...

pingServer :: Process ()
pingServer = forever $ do
    m <- expect
    case m of
      Ping from -> do
        liftIO . putStrLn $ "Got ping from: " ++ show from
        self <- getSelfPid
        send from (Pong self)
      _ -> liftIO . putStrLn $ "Got other msg"

...



Advanced Messaging


Wie im letzten Abschnitt beschrieben, verbleiben ungematchte Nachrichten weiterhin in der Message Queue und können dazu führen, dass diese ein Maximum an Kapazität erreicht. Um jedoch eine Vielzahl unterschiedlicher Nachrichtentypen verarbeiten zu können, bedarf es einer anderen Möglichkeit diese passend zuordnen zu können. In Erlang besteht hierfür das receive Statement, welche durch Pattern matching den entsprechenden Kontrollblock ausführt. In Cloud Haskell wurde hierfür folgende Methoden eingeführt:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
-- | Test the matches in order against each message in the queue
receiveWait :: [Match b] -> Process b

-- | Like 'receiveWait' but with a timeout.
receiveTimeout :: Int -> [Match b] -> Process (Maybe b)

-- | Match against any message of the right type
match :: forall a b. Serializable a => (a -> Process b) -> Match b

-- | Match against any message of the right type that satisfies a predicate
matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match b

-- | Remove any message  from the queue
matchUnkown :: Process b -> Match b


Es wird also durch die Angabe einer Liste von Handler Funktionen jede Nachricht in der Message Queue gegen die Liste der Handler Funktionen so lange geprüft bis die Nachricht einer Funktion mit passendem Methodenrumpf zugeordnet werden kann. Alternativ wird die Nachricht sonst in der Message Queue belassen oder durch die Angabe der Funktion matchUnkown entfernt. Zusätzlich zu dem Matching anhand des passenden Typs besteht mit matchIf die Möglichkeit, ein Prädikat zur Prüfung der Nachricht zu übergeben, über welche sich Nachrichten gezielt anhand des Inhalts filtern lassen.
Mit diesen vorgestellten Konzepten lässt sich das PingPong Beispiel nun wie folgt umgestalten:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
...

newtype PingMessage = Ping ProcessId deriving (Binary, Typeable)
newtype PongMessage = Pong ProcessId deriving (Binary, Typeable)

pingServer :: Process ()
pingServer = forever $
    receiveWait [
        match (\(Ping from) -> do
                    liftIO . putStrLn $ "Got ping from: " ++ show from
                    self <- getSelfPid
                    send from (Pong self)),
        match (\(Pong _from) -> liftIO . putStrLn $ "Got other msg"),
        matchUnknown $ liftIO . putStrLn $ "Unkown msg"
        ]

...


Der vorher verwendete Summendatentyp wurde hier auf zwei separate geteilt (Zeile 3 und 4). Der Vorteil bei der Verwendung von newtype ergibt sich dabei daraus, dass Aufgrund der Tatsache das der zu Grunde liegende Datentyp ProcessId bereits serialisierbar ist und somit sowohl PingMessage als auch PongMessage automatisch serialisierbar sind. Ein templating wie zuvor entfällt somit.
Der Ping-Server wurde entsprechend angepasst und enthält nun die jeweils unterschiedlichen Arten in Form von Funktions Handlern, sowie für alle unbekannten Nachrichten die Funktion matchUnkown (Zeile 8 bis 15).


Typed Channels


Bisher wurden die Nachrichten jeweils direkt zwischen Prozessen gesendet und diese konnten von einem beliebigen Typ sein. Da das dynamische Typchecking der Nachrichten der jeweiligen Message Queue jedoch etwas Haskell untypisch ist und ein eher statisch typisierte Form der Message Queue gewünscht ist, wurde das Konzept der Typed Channels eingeführt. Die Typed Channels stellen durch statische Typisierung sicher, dass Prozesse ankommende Nachrichten auch unterstützen können. Dabei werden Nachrichten über einen speziell für den jeweiligen Typ erstellten Kommunikationskanal übertragen und nehmen die Stellung eines privaten Übertragungsweges zwischen Sender und dem Empfängerprozess ein. Der Kanal besteht aus zwei Endpunkten, dem SendPort und dem ReceivePort. Der SendPort lässt sich serialisieren und z. B. als eine Art Sendeknoten an andere Prozesse übertragen. Dieses gilt nicht für den ReceivePort. Der ReceivePort lässt sich nicht serialisieren, da sich ein Endpunkt nie ändern oder dupliziert werden darf. Die Basis API sieht wie folgt aus:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
data SendPort a    -- instance of Typeable, Binary
data ReceivePort a

-- | Create a new typed channel
newChan :: Serializable a => Process (SendPort a, ReceivePort a)

-- | Send a message on a typed channel
sendChan :: Serializable a => SendPort a -> a -> Process ()

-- | Wait for a message on a typed channel
receiveChan :: Serializable a => ReceivePort a -> Process a

-- | Merge a list of typed channels
mergePortBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)


newChan erzeugt einen neuen getypten Kanal und liefert die beiden zuvor beschrieben Endpunkte. sendChan und receiveChan lassen sich analog zu der einfachen Variante mit send und except ausführen, jedoch mit dem Unterschied, dass nun die jeweiligen Endpunkte und nicht die Empfänger PID als Parameter übergeben werden. Es folgt nun das durch Typed Channels modifizierte PingPong Beispiel:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
...

data PongMessage = Pong
                 deriving (Generic, Typeable)

data PingMessage = Ping (SendPort PongMessage)
                 deriving (Generic, Typeable)

...

pingServer :: ReceivePort PingMessage -> Process ()
pingServer rcvPort = do
    (Ping fromPort) <- receiveChan rcvPort
    liftIO . putStrLn $ "Got ping from: " ++ show fromPort
    sendChan fromPort Pong
    pingServer rcvPort

pingClient :: SendPort PingMessage -> Process ()
pingClient serverSPort = do
    (sendPort, rcvPort) <- newChan
    sendChan serverSPort (Ping sendPort)
    Pong <- receiveChan rcvPort
    
    liftIO . putStrLn $ "Got pong from server"
    liftIO $ threadDelay (1*1000000)
    pingClient serverSPort

...

master :: Backend -> [NodeId] -> Process ()
master backend slaves = do
    node <- getSelfNode
    (sendPort :: SendPort PingMessage, rcvPort :: ReceivePort PingMessage) <- newChan
    forM_ slaves $ \slave -> spawn slave $ $(mkClosure 'pingClient) (sendPort :: SendPort PingMessage)
    pingServer rcvPort

...


Die master Funktion unterscheidet sich nun vom Prozesserzeugungs Ablauf (Zeile 34). Der PingServer Kontext wird nun nicht mehr, wie zuvor gezeigt, in einem separaten Prozess, sondern im Kontext des Master-Prozesses direkt nach Erzeugung der jeweiligen PingClient Prozesse gespawnt und ausgeführt. Dies resultiert aus der Tatsache, dass der ReceivePort nicht serialisierbar ist. Den Clients werden also direkt beim spawnen der entsprechende Server-SendPort als Parameter übergeben, über welchen die Ping-Clients dem Server dann in Kombination mit deren neu erzeugtem SendPort (Pong-Antwortkanal Zeile 21) die Ping Nachricht schicken.
Wie in dem gezeigten Beispiel deutlich wird, verleitet diese Art von Messaging zu einem bestimmten Entwurfsmuster, indem für jegliche Antworten zwischen Prozessen ein privater Kommunikationsport erzeugt wird. Dies hat zum Vorteil, dass der Absender stets bekannt ist, was aber auch bedeutet, dass es nicht mehr möglich ist, einen Prozess zu spawnen und ihm direkt im Anschluss eine Nachricht mit Anweisungen zu schicken. Welche Art der Kommunikation jeweils gewählt wird, muss also vom Anwendungsfall abhängig gemacht werden.


Process Monitoring & Error Handling


In Cloud Haskell verhalten sich Prozesse auf Grund ihrer starken Kapselung fehlerresistent gegenüber anderen Prozessen. Operationen wie send oder spawn auf entfernten Nodes schlagen nicht fehl und man erhält keine Fehlerbenachrichtigung bei einem Fehlschlag. Der Absender hat bisher somit keine Möglichkeit im Fehlerfall entsprechend zu reagieren. Die Möglichkeit andere Prozesse zu überwachen, wurde also eingeführt, um über mögliche Ausnahmesituationen sowie Fehlerzustände mittels Message Passing informiert zu werden.

1
2
3
4
5
-- | Link to a remote process (asynchronous)
link :: ProcessId -> Process ()

-- | Unlink to a remote process (asynchronous)
unlink :: ProcessId -> Process ()


Mit der Funktion link wird eine unidirektionale Verbindung nach dem Prinzip "all or nothing" erstellt. Dies bedeutet, dass ein Link einmalig besteht und mehrfaches Ausführen von link zu keiner neuen Verbindung führt. Diese Verbindung führt dazu, dass sobald der beobachtete Prozess etwa durch einen Fehler ausfällt, der beobachtende Prozess ebenfalls ausfällt. Beide Operationen link und unlink werden dabei asynchron ausgeführt, welches zu unerwarteten Resultaten führen kann. Das folgende Beispiel zeigt einen solchen Fall.

1
2
3
link pidB          -- Link to process B
msgFromB <- expect -- Wait for a message from process B
unlink pidB        -- Unlink again


Der demonstrierte Code lässt fälschlicherweise vermuten, dass auf diese Weise eine Art von Monitoring vorgenommen werden kann, bei dem ein unendliches Warten einer Nachricht von Prozess pidB im Fehlerfall vermieden werden kann. Auf Grund der Asynchronität kann die tatsächliche Ausführungsreihenfolge der Operationen nicht garantiert werden, welches zu einer Race Condition führen kann.
Um der zuvor beschriebenen Situation entgegen zu steuern, besteht mit den Monitoren ein weiteres Konzept zur Prozessüberwachung.

1
2
3
4
5
-- | Monitor another process (asynchronous)
monitor :: ProcessId -> Process MonitorRef

-- | Remove a monitor
unmonitor :: MonitorRef -> Process ()


Solch ein Monitor lässt sich über die Funktion monitor erzeugen, welches bei Mehrfachausführung jeweils zu einer neuen Referenz führt. Der Unterschied zu Link besteht darin, dass nun Nachrichten bei Fehlerfällen verschickt werden, anstatt den Prozess ausfallen zu lassen. Es handelt sich dabei um sogenannte ProcessMonitorNotification Nachrichten, über welche sich auch der Grund (DiedNormal, DiedException, DiedDisconnect) des Beendens herleiten lässt. Durch das Konzept des Monitors lassen sich nun auch Aufgaben wie etwa Supervision abbilden, welches im Folgenden an dem modifizierten PingPong Beispiel ersichtlich wird.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
...

master :: Backend -> [NodeId] -> Process ()
master backend slaves = forever $ do
        node <- getSelfNode

        serverPid <- spawn node $ staticClosure $(mkStatic 'pingServer)

        _ <- monitor serverPid

        clients <- mapM (\slave -> spawn slave $ $(mkClosure 'pingClient) (serverPid)) slaves
        
        liftIO $ threadDelay (1*1000000)

        let badSlave = head slaves
        badClient <- spawn badSlave $ $(mkClosure 'pingClientBad) (serverPid)

        receiveWait [ match $ \(ProcessMonitorNotification _monitorRef pid reason) -> do
                                    liftIO . putStrLn $ "Process: " ++ (show pid) ++ " died with reason: " ++ (show reason)
                                    forM_ (badClient : clients) $ \cpid -> kill cpid "now"
                    ]

...


Der Master Prozess erzeugt nun einen Monitor (Zeile 9) auf den PingServer Prozess, durch welchen der Master Prozess bei Beenden des PingServer Prozesses eine Mitteilung erhält (Zeile 18). Bei Beenden werden anschließend alle PingClient Prozesse beendet, um dann die Master Logik erneut durchführen zu können (Zeile 20). Das gesamte PingPong Server/Client System wird somit erneut initialisiert und ausgeführt.


... [ Programmiersprachen und Sprachsysteme ] ... [ Cloud Haskell ] ... [ << Cloud Haskell Architektur ] ... [ Serialisierung >> ] ...
generated by schmidt-doku-generator (GitHub)