/*
|
* %CopyrightBegin%
|
*
|
* Copyright Ericsson AB 2000-2009. All Rights Reserved.
|
*
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
* you may not use this file except in compliance with the License.
|
* You may obtain a copy of the License at
|
*
|
* http://www.apache.org/licenses/LICENSE-2.0
|
*
|
* Unless required by applicable law or agreed to in writing, software
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* See the License for the specific language governing permissions and
|
* limitations under the License.
|
*
|
* %CopyrightEnd%
|
*/
|
package com.ericsson.otp.erlang;
|
|
import java.io.IOException;
|
|
/**
|
* Maintains a connection between a Java process and a remote Erlang, Java or C
|
* node. The object maintains connection state and allows data to be sent to and
|
* received from the peer.
|
* <p>
|
* <p>
|
* Once a connection is established between the local node and a remote node,
|
* the connection object can be used to send and receive messages between the
|
* nodes and make rpc calls (assuming that the remote node is a real Erlang
|
* node).
|
* <p>
|
* <p>
|
* The various receive methods are all blocking and will return only when a
|
* valid message has been received or an exception is raised.
|
* <p>
|
* <p>
|
* If an exception occurs in any of the methods in this class, the connection
|
* will be closed and must be explicitely reopened in order to resume
|
* communication with the peer.
|
* <p>
|
* <p>
|
* It is not possible to create an instance of this class directly.
|
* OtpConnection objects are returned by {@link OtpSelf#connect(OtpPeer)
|
* OtpSelf.connect()} and {@link OtpSelf#accept() OtpSelf.accept()}.
|
*/
|
public class OtpConnection extends AbstractConnection {
|
protected OtpSelf self;
|
protected GenericQueue queue; // messages get delivered here
|
|
public OtpConnection() {
|
}
|
|
/*
|
* Accept an incoming connection from a remote node. Used by {@link
|
* OtpSelf#accept() OtpSelf.accept()} to create a connection based on data
|
* received when handshaking with the peer node, when the remote node is the
|
* connection intitiator.
|
*
|
* @exception java.io.IOException if it was not possible to connect to the
|
* peer.
|
*
|
* @exception OtpAuthException if handshake resulted in an authentication
|
* error
|
*/
|
// package scope
|
public OtpConnection(final OtpSelf self, final OtpTransport s)
|
throws IOException, OtpAuthException {
|
super(self, s);
|
this.self = self;
|
queue = new GenericQueue();
|
start();
|
}
|
|
/*
|
* Intiate and open a connection to a remote node.
|
*
|
* @exception java.io.IOException if it was not possible to connect to the
|
* peer.
|
*
|
* @exception OtpAuthException if handshake resulted in an authentication
|
* error.
|
*/
|
// package scope
|
public OtpConnection(final OtpSelf self, final OtpPeer other) throws IOException,
|
OtpAuthException {
|
super(self, other);
|
this.self = self;
|
queue = new GenericQueue();
|
start();
|
}
|
|
public void deliver(final Exception e) {
|
queue.put(e);
|
}
|
|
public void deliver(final OtpMsg msg) {
|
queue.put(msg);
|
}
|
|
/**
|
* Get information about the node at the peer end of this connection.
|
*
|
* @return the {@link OtpPeer Node} representing the peer node.
|
*/
|
public OtpPeer peer() {
|
return peer;
|
}
|
|
/**
|
* Get information about the node at the local end of this connection.
|
*
|
* @return the {@link OtpSelf Node} representing the local node.
|
*/
|
public OtpSelf self() {
|
return self;
|
}
|
|
/**
|
* Return the number of messages currently waiting in the receive queue for
|
* this connection.
|
*/
|
public int msgCount() {
|
return queue.getCount();
|
}
|
|
/**
|
* Receive a message from a remote process. This method blocks until a valid
|
* message is received or an exception is raised.
|
* <p>
|
* <p>
|
* If the remote node sends a message that cannot be decoded properly, the
|
* connection is closed and the method throws an exception.
|
*
|
* @return an object containing a single Erlang term.
|
* @throws java.io.IOException if the connection is not active or a communication error
|
* occurs.
|
* @throws OtpErlangExit if an exit signal is received from a process on the peer
|
* node.
|
* @throws OtpAuthException if the remote node sends a message containing an invalid
|
* cookie.
|
*/
|
public OtpErlangObject receive() throws IOException, OtpErlangExit,
|
OtpAuthException {
|
try {
|
return receiveMsg().getMsg();
|
} catch (final OtpErlangDecodeException e) {
|
close();
|
throw new IOException(e.getMessage());
|
}
|
}
|
|
/**
|
* Receive a message from a remote process. This method blocks at most for
|
* the specified time, until a valid message is received or an exception is
|
* raised.
|
* <p>
|
* <p>
|
* If the remote node sends a message that cannot be decoded properly, the
|
* connection is closed and the method throws an exception.
|
*
|
* @param timeout the time in milliseconds that this operation will block.
|
* Specify 0 to poll the queue.
|
* @return an object containing a single Erlang term.
|
* @throws java.io.IOException if the connection is not active or a communication error
|
* occurs.
|
* @throws OtpErlangExit if an exit signal is received from a process on the peer
|
* node.
|
* @throws OtpAuthException if the remote node sends a message containing an invalid
|
* cookie.
|
* @throws InterruptedException if no message if the method times out before a message
|
* becomes available.
|
*/
|
public OtpErlangObject receive(final long timeout)
|
throws InterruptedException, IOException, OtpErlangExit,
|
OtpAuthException {
|
try {
|
return receiveMsg(timeout).getMsg();
|
} catch (final OtpErlangDecodeException e) {
|
close();
|
throw new IOException(e.getMessage());
|
}
|
}
|
|
/**
|
* Receive a raw (still encoded) message from a remote process. This message
|
* blocks until a valid message is received or an exception is raised.
|
* <p>
|
* <p>
|
* If the remote node sends a message that cannot be decoded properly, the
|
* connection is closed and the method throws an exception.
|
*
|
* @return an object containing a raw (still encoded) Erlang term.
|
* @throws java.io.IOException if the connection is not active or a communication error
|
* occurs.
|
* @throws OtpErlangExit if an exit signal is received from a process on the peer
|
* node, or if the connection is lost for any reason.
|
* @throws OtpAuthException if the remote node sends a message containing an invalid
|
* cookie.
|
*/
|
public OtpInputStream receiveBuf() throws IOException, OtpErlangExit,
|
OtpAuthException {
|
return receiveMsg().getMsgBuf();
|
}
|
|
/**
|
* Receive a raw (still encoded) message from a remote process. This message
|
* blocks at most for the specified time until a valid message is received
|
* or an exception is raised.
|
* <p>
|
* <p>
|
* If the remote node sends a message that cannot be decoded properly, the
|
* connection is closed and the method throws an exception.
|
*
|
* @param timeout the time in milliseconds that this operation will block.
|
* Specify 0 to poll the queue.
|
* @return an object containing a raw (still encoded) Erlang term.
|
* @throws java.io.IOException if the connection is not active or a communication error
|
* occurs.
|
* @throws OtpErlangExit if an exit signal is received from a process on the peer
|
* node, or if the connection is lost for any reason.
|
* @throws OtpAuthException if the remote node sends a message containing an invalid
|
* cookie.
|
* @throws InterruptedException if no message if the method times out before a message
|
* becomes available.
|
*/
|
public OtpInputStream receiveBuf(final long timeout)
|
throws InterruptedException, IOException, OtpErlangExit,
|
OtpAuthException {
|
return receiveMsg(timeout).getMsgBuf();
|
}
|
|
/**
|
* Receive a messge complete with sender and recipient information.
|
*
|
* @return an {@link OtpMsg OtpMsg} containing the header information about
|
* the sender and recipient, as well as the actual message contents.
|
* @throws java.io.IOException if the connection is not active or a communication error
|
* occurs.
|
* @throws OtpErlangExit if an exit signal is received from a process on the peer
|
* node, or if the connection is lost for any reason.
|
* @throws OtpAuthException if the remote node sends a message containing an invalid
|
* cookie.
|
*/
|
public OtpMsg receiveMsg() throws IOException, OtpErlangExit,
|
OtpAuthException {
|
final Object o = queue.get();
|
if (o instanceof OtpMsg) {
|
return (OtpMsg) o;
|
} else if (o instanceof IOException) {
|
throw (IOException) o;
|
} else if (o instanceof OtpErlangExit) {
|
throw (OtpErlangExit) o;
|
} else if (o instanceof OtpAuthException) {
|
throw (OtpAuthException) o;
|
}
|
return null;
|
}
|
|
/**
|
* Receive a messge complete with sender and recipient information. This
|
* method blocks at most for the specified time.
|
*
|
* @param timeout the time in milliseconds that this operation will block.
|
* Specify 0 to poll the queue.
|
* @return an {@link OtpMsg OtpMsg} containing the header information about
|
* the sender and recipient, as well as the actual message contents.
|
* @throws java.io.IOException if the connection is not active or a communication error
|
* occurs.
|
* @throws OtpErlangExit if an exit signal is received from a process on the peer
|
* node, or if the connection is lost for any reason.
|
* @throws OtpAuthException if the remote node sends a message containing an invalid
|
* cookie.
|
* @throws InterruptedException if no message if the method times out before a message
|
* becomes available.
|
*/
|
public OtpMsg receiveMsg(final long timeout) throws InterruptedException,
|
IOException, OtpErlangExit, OtpAuthException {
|
final Object o = queue.get(timeout);
|
if (o instanceof OtpMsg) {
|
return (OtpMsg) o;
|
} else if (o instanceof IOException) {
|
throw (IOException) o;
|
} else if (o instanceof OtpErlangExit) {
|
throw (OtpErlangExit) o;
|
} else if (o instanceof OtpAuthException) {
|
throw (OtpAuthException) o;
|
}
|
return null;
|
}
|
|
/**
|
* Send a message to a process on a remote node.
|
*
|
* @param dest the Erlang PID of the remote process.
|
* @param msg the message to send.
|
* @throws java.io.IOException if the connection is not active or a communication error
|
* occurs.
|
*/
|
@SuppressWarnings("resource")
|
public void send(final OtpErlangPid dest, final OtpErlangObject msg)
|
throws IOException {
|
// encode and send the message
|
super.sendBuf(self.pid(), dest, new OtpOutputStream(msg));
|
}
|
|
/**
|
* Send a message to a named process on a remote node.
|
*
|
* @param dest the name of the remote process.
|
* @param msg the message to send.
|
* @throws java.io.IOException if the connection is not active or a communication error
|
* occurs.
|
*/
|
@SuppressWarnings("resource")
|
public void send(final String dest, final OtpErlangObject msg)
|
throws IOException {
|
// encode and send the message
|
super.sendBuf(self.pid(), dest, new OtpOutputStream(msg));
|
}
|
|
/**
|
* Send a pre-encoded message to a named process on a remote node.
|
*
|
* @param dest the name of the remote process.
|
* @param payload the encoded message to send.
|
* @throws java.io.IOException if the connection is not active or a communication error
|
* occurs.
|
*/
|
public void sendBuf(final String dest, final OtpOutputStream payload)
|
throws IOException {
|
super.sendBuf(self.pid(), dest, payload);
|
}
|
|
/**
|
* Send a pre-encoded message to a process on a remote node.
|
*
|
* @param dest the Erlang PID of the remote process.
|
* @param payload the encoded message to send.
|
* @throws java.io.IOException if the connection is not active or a communication error
|
* occurs.
|
*/
|
public void sendBuf(final OtpErlangPid dest, final OtpOutputStream payload)
|
throws IOException {
|
super.sendBuf(self.pid(), dest, payload);
|
}
|
|
/**
|
* Send an RPC request to the remote Erlang node. This convenience function
|
* creates the following message and sends it to 'rex' on the remote node:
|
* <p>
|
* <pre>
|
* { self, { call, Mod, Fun, Args, user } }
|
* </pre>
|
* <p>
|
* <p>
|
* Note that this method has unpredicatble results if the remote node is not
|
* an Erlang node.
|
* </p>
|
*
|
* @param mod the name of the Erlang module containing the function to be
|
* called.
|
* @param fun the name of the function to call.
|
* @param args an array of Erlang terms, to be used as arguments to the
|
* function.
|
* @throws java.io.IOException if the connection is not active or a communication error
|
* occurs.
|
*/
|
public void sendRPC(final String mod, final String fun,
|
final OtpErlangObject[] args) throws IOException {
|
sendRPC(mod, fun, new OtpErlangList(args));
|
}
|
|
/**
|
* Send an RPC request to the remote Erlang node. This convenience function
|
* creates the following message and sends it to 'rex' on the remote node:
|
* <p>
|
* <pre>
|
* { self, { call, Mod, Fun, Args, user } }
|
* </pre>
|
* <p>
|
* <p>
|
* Note that this method has unpredicatble results if the remote node is not
|
* an Erlang node.
|
* </p>
|
*
|
* @param mod the name of the Erlang module containing the function to be
|
* called.
|
* @param fun the name of the function to call.
|
* @param args a list of Erlang terms, to be used as arguments to the
|
* function.
|
* @throws java.io.IOException if the connection is not active or a communication error
|
* occurs.
|
*/
|
public void sendRPC(final String mod, final String fun,
|
final OtpErlangList args) throws IOException {
|
final OtpErlangObject[] rpc = new OtpErlangObject[2];
|
final OtpErlangObject[] call = new OtpErlangObject[5];
|
/* {self, { call, Mod, Fun, Args, user}} */
|
call[0] = new OtpErlangAtom("call");
|
call[1] = new OtpErlangAtom(mod);
|
call[2] = new OtpErlangAtom(fun);
|
call[3] = args;
|
call[4] = new OtpErlangAtom("user");
|
rpc[0] = self.pid();
|
rpc[1] = new OtpErlangTuple(call);
|
send("rex", new OtpErlangTuple(rpc));
|
}
|
|
/**
|
* Receive an RPC reply from the remote Erlang node. This convenience
|
* function receives a message from the remote node, and expects it to have
|
* the following format:
|
* <p>
|
* <pre>
|
* { rex, Term }
|
* </pre>
|
*
|
* @return the second element of the tuple if the received message is a
|
* two-tuple, otherwise null. No further error checking is
|
* performed.
|
* @throws java.io.IOException if the connection is not active or a communication error
|
* occurs.
|
* @throws OtpErlangExit if an exit signal is received from a process on the peer
|
* node.
|
* @throws OtpAuthException if the remote node sends a message containing an invalid
|
* cookie.
|
*/
|
public OtpErlangObject receiveRPC() throws IOException, OtpErlangExit,
|
OtpAuthException {
|
final OtpErlangObject msg = receive();
|
if (msg instanceof OtpErlangTuple) {
|
final OtpErlangTuple t = (OtpErlangTuple) msg;
|
if (t.arity() == 2) {
|
return t.elementAt(1); // obs: second element
|
}
|
}
|
return null;
|
}
|
|
/**
|
* Create a link between the local node and the specified process on the
|
* remote node. If the link is still active when the remote process
|
* terminates, an exit signal will be sent to this connection. Use
|
* {@link #unlink unlink()} to remove the link.
|
*
|
* @param dest the Erlang PID of the remote process.
|
* @throws java.io.IOException if the connection is not active or a communication error
|
* occurs.
|
*/
|
public void link(final OtpErlangPid dest) throws IOException {
|
super.sendLink(self.pid(), dest);
|
}
|
|
/**
|
* Remove a link between the local node and the specified process on the
|
* remote node. This method deactivates links created with {@link #link
|
* link()}.
|
*
|
* @param dest the Erlang PID of the remote process.
|
* @throws java.io.IOException if the connection is not active or a communication error
|
* occurs.
|
*/
|
public void unlink(final OtpErlangPid dest) throws IOException {
|
super.sendUnlink(self.pid(), dest);
|
}
|
|
/**
|
* Send an exit signal to a remote process.
|
*
|
* @param dest the Erlang PID of the remote process.
|
* @param reason an Erlang term describing the exit reason.
|
* @throws java.io.IOException if the connection is not active or a communication error
|
* occurs.
|
*/
|
public void exit(final OtpErlangPid dest, final OtpErlangObject reason)
|
throws IOException {
|
super.sendExit2(self.pid(), dest, reason);
|
}
|
}
|