/*
|
* %CopyrightBegin%
|
*
|
* Copyright Ericsson AB 2000-2012. All Rights Reserved.
|
*
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
* you may not use this file except in compliance with the License.
|
* You may obtain a copy of the License at
|
*
|
* http://www.apache.org/licenses/LICENSE-2.0
|
*
|
* Unless required by applicable law or agreed to in writing, software
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* See the License for the specific language governing permissions and
|
* limitations under the License.
|
*
|
* %CopyrightEnd%
|
*/
|
package com.ericsson.otp.erlang;
|
|
import java.io.IOException;
|
import java.lang.ref.WeakReference;
|
import java.util.Collection;
|
import java.util.Enumeration;
|
import java.util.Hashtable;
|
import java.util.Iterator;
|
|
/**
|
* <p>
|
* Represents a local OTP node. This class is used when you do not wish to
|
* manage connections yourself - outgoing connections are established as needed,
|
* and incoming connections accepted automatically. This class supports the use
|
* of a mailbox API for communication, while management of the underlying
|
* communication mechanism is automatic and hidden from the application
|
* programmer.
|
* </p>
|
* <p>
|
* <p>
|
* Once an instance of this class has been created, obtain one or more mailboxes
|
* in order to send or receive messages. The first message sent to a given node
|
* will cause a connection to be set up to that node. Any messages received will
|
* be delivered to the appropriate mailboxes.
|
* </p>
|
* <p>
|
* <p>
|
* To shut down the node, call {@link #close close()}. This will prevent the
|
* node from accepting additional connections and it will cause all existing
|
* connections to be closed. Any unread messages in existing mailboxes can still
|
* be read, however no new messages will be delivered to the mailboxes.
|
* </p>
|
* <p>
|
* <p>
|
* Note that the use of this class requires that Epmd (Erlang Port Mapper
|
* Daemon) is running on each cooperating host. This class does not start Epmd
|
* automatically as Erlang does, you must start it manually or through some
|
* other means. See the Erlang documentation for more information about this.
|
* </p>
|
*/
|
public class OtpNode extends OtpLocalNode {
|
// keep track of all connections
|
Hashtable<String, OtpCookedConnection> connections = null;
|
// keep track of all mailboxes
|
Mailboxes mboxes = null;
|
// handle status changes
|
OtpNodeStatus handler;
|
private boolean initDone = false;
|
// thread to manage incoming connections
|
private Acceptor acceptor = null;
|
// flags
|
private int connFlags = 0;
|
|
/**
|
* <p>
|
* Create a node using the default cookie. The default cookie is found by
|
* reading the first line of the .erlang.cookie file in the user's home
|
* directory. The home directory is obtained from the System property
|
* "user.home".
|
* </p>
|
* <p>
|
* <p>
|
* If the file does not exist, an empty string is used. This method makes no
|
* attempt to create the file.
|
* </p>
|
*
|
* @param node the name of this node.
|
* @throws IOException if communication could not be initialized.
|
*/
|
public OtpNode(final String node) throws IOException {
|
super(node);
|
init(0);
|
}
|
|
/**
|
* <p>
|
* Create a node using the default cookie. The default cookie is found by
|
* reading the first line of the .erlang.cookie file in the user's home
|
* directory. The home directory is obtained from the System property
|
* "user.home".
|
* </p>
|
* <p>
|
* <p>
|
* If the file does not exist, an empty string is used. This method makes no
|
* attempt to create the file.
|
* </p>
|
*
|
* @param node the name of this node.
|
* @param transportFactory the transport factory to use when creating connections.
|
* @throws IOException if communication could not be initialized.
|
*/
|
public OtpNode(final String node,
|
final OtpTransportFactory transportFactory) throws IOException {
|
super(node, transportFactory);
|
init(0);
|
}
|
|
/**
|
* Create a node.
|
*
|
* @param node the name of this node.
|
* @param cookie the authorization cookie that will be used by this node when
|
* it communicates with other nodes.
|
* @throws IOException if communication could not be initialized.
|
*/
|
public OtpNode(final String node, final String cookie) throws IOException {
|
this(node, cookie, 0);
|
}
|
|
/**
|
* Create a node.
|
*
|
* @param node the name of this node.
|
* @param cookie the authorization cookie that will be used by this node when
|
* it communicates with other nodes.
|
* @param transportFactory the transport factory to use when creating connections.
|
* @throws IOException if communication could not be initialized.
|
*/
|
public OtpNode(final String node, final String cookie,
|
final OtpTransportFactory transportFactory) throws IOException {
|
this(node, cookie, 0, transportFactory);
|
}
|
|
/**
|
* Create a node.
|
*
|
* @param node the name of this node.
|
* @param cookie the authorization cookie that will be used by this node when
|
* it communicates with other nodes.
|
* @param port the port number you wish to use for incoming connections.
|
* Specifying 0 lets the system choose an available port.
|
* @throws IOException if communication could not be initialized.
|
*/
|
public OtpNode(final String node, final String cookie, final int port)
|
throws IOException {
|
super(node, cookie);
|
init(port);
|
}
|
|
/**
|
* Create a node.
|
*
|
* @param node the name of this node.
|
* @param cookie the authorization cookie that will be used by this node when
|
* it communicates with other nodes.
|
* @param port the port number you wish to use for incoming connections.
|
* Specifying 0 lets the system choose an available port.
|
* @param transportFactory the transport factory to use when creating connections.
|
* @throws IOException if communication could not be initialized.
|
*/
|
public OtpNode(final String node, final String cookie, final int port,
|
final OtpTransportFactory transportFactory) throws IOException {
|
super(node, cookie, transportFactory);
|
init(port);
|
}
|
|
private synchronized void init(final int aport) throws IOException {
|
if (!initDone) {
|
connections = new Hashtable<String, OtpCookedConnection>(17,
|
(float) 0.95);
|
mboxes = new Mailboxes();
|
acceptor = new Acceptor(aport);
|
initDone = true;
|
}
|
}
|
|
/**
|
* Close the node. Unpublish the node from Epmd (preventing new connections)
|
* and close all existing connections.
|
*/
|
public synchronized void close() {
|
acceptor.quit();
|
OtpCookedConnection conn;
|
final Collection<OtpCookedConnection> coll = connections.values();
|
final Iterator<OtpCookedConnection> it = coll.iterator();
|
mboxes.clear();
|
while (it.hasNext()) {
|
conn = it.next();
|
it.remove();
|
conn.close();
|
}
|
initDone = false;
|
}
|
|
protected void finalize() {
|
close();
|
}
|
|
/**
|
* Create an unnamed {@link OtpMbox mailbox} that can be used to send and
|
* receive messages with other, similar mailboxes and with Erlang processes.
|
* Messages can be sent to this mailbox by using its associated
|
* {@link OtpMbox#self() pid}.
|
*
|
* @return a mailbox.
|
*/
|
public OtpMbox createMbox() {
|
return mboxes.create();
|
}
|
|
/**
|
* Close the specified mailbox with reason 'normal'.
|
*
|
* @param mbox the mailbox to close.
|
* <p>
|
* <p>
|
* After this operation, the mailbox will no longer be able to
|
* receive messages. Any delivered but as yet unretrieved
|
* messages can still be retrieved however.
|
* </p>
|
* <p>
|
* <p>
|
* If there are links from the mailbox to other
|
* {@link OtpErlangPid pids}, they will be broken when this
|
* method is called and exit signals with reason 'normal' will be
|
* sent.
|
* </p>
|
*/
|
public void closeMbox(final OtpMbox mbox) {
|
closeMbox(mbox, new OtpErlangAtom("normal"));
|
}
|
|
/**
|
* Close the specified mailbox with the given reason.
|
*
|
* @param mbox the mailbox to close.
|
* @param reason an Erlang term describing the reason for the termination.
|
* <p>
|
* <p>
|
* After this operation, the mailbox will no longer be able to
|
* receive messages. Any delivered but as yet unretrieved
|
* messages can still be retrieved however.
|
* </p>
|
* <p>
|
* <p>
|
* If there are links from the mailbox to other
|
* {@link OtpErlangPid pids}, they will be broken when this
|
* method is called and exit signals with the given reason will
|
* be sent.
|
* </p>
|
*/
|
public void closeMbox(final OtpMbox mbox, final OtpErlangObject reason) {
|
if (mbox != null) {
|
mboxes.remove(mbox);
|
mbox.name = null;
|
mbox.breakLinks(reason);
|
}
|
}
|
|
/**
|
* Create an named mailbox that can be used to send and receive messages
|
* with other, similar mailboxes and with Erlang processes. Messages can be
|
* sent to this mailbox by using its registered name or the associated
|
* {@link OtpMbox#self() pid}.
|
*
|
* @param name a name to register for this mailbox. The name must be unique
|
* within this OtpNode.
|
* @return a mailbox, or null if the name was already in use.
|
*/
|
public OtpMbox createMbox(final String name) {
|
return mboxes.create(name);
|
}
|
|
/**
|
* <p>
|
* Register or remove a name for the given mailbox. Registering a name for a
|
* mailbox enables others to send messages without knowing the
|
* {@link OtpErlangPid pid} of the mailbox. A mailbox can have at most one
|
* name; if the mailbox already had a name, calling this method will
|
* supercede that name.
|
* </p>
|
*
|
* @param name the name to register for the mailbox. Specify null to
|
* unregister the existing name from this mailbox.
|
* @param mbox the mailbox to associate with the name.
|
* @return true if the name was available, or false otherwise.
|
*/
|
public boolean registerName(final String name, final OtpMbox mbox) {
|
return mboxes.register(name, mbox);
|
}
|
|
/**
|
* Get a list of all known registered names on this node.
|
*
|
* @return an array of Strings, containins all known registered names on
|
* this node.
|
*/
|
public String[] getNames() {
|
return mboxes.names();
|
}
|
|
/**
|
* Determine the {@link OtpErlangPid pid} corresponding to a registered name
|
* on this node.
|
*
|
* @return the {@link OtpErlangPid pid} corresponding to the registered
|
* name, or null if the name is not known on this node.
|
*/
|
public OtpErlangPid whereis(final String name) {
|
final OtpMbox m = mboxes.get(name);
|
if (m != null) {
|
return m.self();
|
}
|
return null;
|
}
|
|
/**
|
* Register interest in certain system events. The {@link OtpNodeStatus
|
* OtpNodeStatus} handler object contains callback methods, that will be
|
* called when certain events occur.
|
*
|
* @param ahandler the callback object to register. To clear the handler, specify
|
* null as the handler to use.
|
*/
|
public synchronized void registerStatusHandler(final OtpNodeStatus ahandler) {
|
handler = ahandler;
|
}
|
|
/**
|
* <p>
|
* Determine if another node is alive. This method has the side effect of
|
* setting up a connection to the remote node (if possible). Only a single
|
* outgoing message is sent; the timeout is how long to wait for a response.
|
* </p>
|
* <p>
|
* <p>
|
* Only a single attempt is made to connect to the remote node, so for
|
* example it is not possible to specify an extremely long timeout and
|
* expect to be notified when the node eventually comes up. If you wish to
|
* wait for a remote node to be started, the following construction may be
|
* useful:
|
* </p>
|
* <p>
|
* <pre>
|
* // ping every 2 seconds until positive response
|
* while (!me.ping(him, 2000))
|
* ;
|
* </pre>
|
*
|
* @param anode the name of the node to ping.
|
* @param timeout the time, in milliseconds, to wait for response before
|
* returning false.
|
* @return true if the node was alive and the correct ping response was
|
* returned. false if the correct response was not returned on time.
|
*/
|
/*
|
* internal info about the message formats...
|
*
|
* the request: -> REG_SEND {6,#Pid<bingo@aule.1.0>,'',net_kernel}
|
* {'$gen_call',{#Pid<bingo@aule.1.0>,#Ref<bingo@aule.2>},{is_auth,bingo@aule}}
|
*
|
* the reply: <- SEND {2,'',#Pid<bingo@aule.1.0>} {#Ref<bingo@aule.2>,yes}
|
*/
|
public boolean ping(final String anode, final long timeout) {
|
if (anode.equals(node)) {
|
return true;
|
} else if (anode.indexOf('@', 0) < 0
|
&& anode.equals(node.substring(0, node.indexOf('@', 0)))) {
|
return true;
|
}
|
// other node
|
OtpMbox mbox = null;
|
try {
|
mbox = createMbox();
|
mbox.send("net_kernel", anode, getPingTuple(mbox));
|
final OtpErlangObject reply = mbox.receive(timeout);
|
final OtpErlangTuple t = (OtpErlangTuple) reply;
|
final OtpErlangAtom a = (OtpErlangAtom) t.elementAt(1);
|
return "yes".equals(a.atomValue());
|
} catch (final Exception e) {
|
} finally {
|
closeMbox(mbox);
|
}
|
return false;
|
}
|
|
/* create the outgoing ping message */
|
private OtpErlangTuple getPingTuple(final OtpMbox mbox) {
|
final OtpErlangObject[] ping = new OtpErlangObject[3];
|
final OtpErlangObject[] pid = new OtpErlangObject[2];
|
final OtpErlangObject[] anode = new OtpErlangObject[2];
|
pid[0] = mbox.self();
|
pid[1] = createRef();
|
anode[0] = new OtpErlangAtom("is_auth");
|
anode[1] = new OtpErlangAtom(node());
|
ping[0] = new OtpErlangAtom("$gen_call");
|
ping[1] = new OtpErlangTuple(pid);
|
ping[2] = new OtpErlangTuple(anode);
|
return new OtpErlangTuple(ping);
|
}
|
|
/*
|
* this method simulates net_kernel only for the purpose of replying to
|
* pings.
|
*/
|
private boolean netKernel(final OtpMsg m) {
|
OtpMbox mbox = null;
|
try {
|
final OtpErlangTuple t = (OtpErlangTuple) m.getMsg();
|
final OtpErlangTuple req = (OtpErlangTuple) t.elementAt(1); // actual
|
// request
|
final OtpErlangPid pid = (OtpErlangPid) req.elementAt(0); // originating
|
// pid
|
final OtpErlangObject[] pong = new OtpErlangObject[2];
|
pong[0] = req.elementAt(1); // his #Ref
|
pong[1] = new OtpErlangAtom("yes");
|
mbox = createMbox();
|
mbox.send(pid, new OtpErlangTuple(pong));
|
return true;
|
} catch (final Exception e) {
|
} finally {
|
closeMbox(mbox);
|
}
|
return false;
|
}
|
|
/*
|
* OtpCookedConnection delivers messages here return true if message was
|
* delivered successfully, or false otherwise.
|
*/
|
boolean deliver(final OtpMsg m) {
|
OtpMbox mbox;
|
try {
|
final int t = m.type();
|
if (t == OtpMsg.regSendTag) {
|
final String name = m.getRecipientName();
|
/* special case for netKernel requests */
|
if (name.equals("net_kernel")) {
|
return netKernel(m);
|
}
|
mbox = mboxes.get(name);
|
} else {
|
mbox = mboxes.get(m.getRecipientPid());
|
}
|
if (mbox == null) {
|
return false;
|
}
|
mbox.deliver(m);
|
} catch (final Exception e) {
|
return false;
|
}
|
return true;
|
}
|
|
/*
|
* OtpCookedConnection delivers errors here, we send them on to the handler
|
* specified by the application
|
*/
|
void deliverError(final OtpCookedConnection conn, final Exception e) {
|
removeConnection(conn);
|
remoteStatus(conn.name, false, e);
|
}
|
|
/*
|
* find or create a connection to the given node
|
*/
|
OtpCookedConnection getConnection(final String anode) {
|
OtpPeer peer;
|
OtpCookedConnection conn;
|
synchronized (connections) {
|
// first just try looking up the name as-is
|
conn = connections.get(anode);
|
if (conn == null) {
|
// in case node had no '@' add localhost info and try again
|
peer = new OtpPeer(anode);
|
conn = connections.get(peer.node());
|
if (conn == null) {
|
try {
|
conn = new OtpCookedConnection(this, peer);
|
conn.setFlags(connFlags);
|
addConnection(conn);
|
} catch (final Exception e) {
|
/* false = outgoing */
|
connAttempt(peer.node(), false, e);
|
}
|
}
|
}
|
return conn;
|
}
|
}
|
|
void addConnection(final OtpCookedConnection conn) {
|
if (conn != null && conn.name != null) {
|
connections.put(conn.name, conn);
|
remoteStatus(conn.name, true, null);
|
}
|
}
|
|
private void removeConnection(final OtpCookedConnection conn) {
|
if (conn != null && conn.name != null) {
|
connections.remove(conn.name);
|
}
|
}
|
|
/* use these wrappers to call handler functions */
|
private synchronized void remoteStatus(final String anode,
|
final boolean up, final Object info) {
|
if (handler == null) {
|
return;
|
}
|
try {
|
handler.remoteStatus(anode, up, info);
|
} catch (final Exception e) {
|
}
|
}
|
|
synchronized void localStatus(final String anode, final boolean up,
|
final Object info) {
|
if (handler == null) {
|
return;
|
}
|
try {
|
handler.localStatus(anode, up, info);
|
} catch (final Exception e) {
|
}
|
}
|
|
synchronized void connAttempt(final String anode, final boolean incoming,
|
final Object info) {
|
if (handler == null) {
|
return;
|
}
|
try {
|
handler.connAttempt(anode, incoming, info);
|
} catch (final Exception e) {
|
}
|
}
|
|
public void setFlags(final int flags) {
|
connFlags = flags;
|
}
|
|
/*
|
* this class used to wrap the mailbox hashtables so we can use weak
|
* references
|
*/
|
public class Mailboxes {
|
// mbox pids here
|
private Hashtable<OtpErlangPid, WeakReference<OtpMbox>> byPid;
|
// mbox names here
|
private Hashtable<String, WeakReference<OtpMbox>> byName;
|
|
public Mailboxes() {
|
byPid = new Hashtable<OtpErlangPid, WeakReference<OtpMbox>>(17,
|
(float) 0.95);
|
byName = new Hashtable<String, WeakReference<OtpMbox>>(17,
|
(float) 0.95);
|
}
|
|
public OtpMbox create(final String name) {
|
OtpMbox m;
|
synchronized (byName) {
|
if (get(name) != null) {
|
return null;
|
}
|
final OtpErlangPid pid = createPid();
|
m = new OtpMbox(OtpNode.this, pid, name);
|
byPid.put(pid, new WeakReference<OtpMbox>(m));
|
byName.put(name, new WeakReference<OtpMbox>(m));
|
}
|
return m;
|
}
|
|
public OtpMbox create() {
|
final OtpErlangPid pid = createPid();
|
final OtpMbox m = new OtpMbox(OtpNode.this, pid);
|
byPid.put(pid, new WeakReference<OtpMbox>(m));
|
return m;
|
}
|
|
public void clear() {
|
byPid.clear();
|
byName.clear();
|
}
|
|
public String[] names() {
|
String allnames[];
|
synchronized (byName) {
|
final int n = byName.size();
|
final Enumeration<String> keys = byName.keys();
|
allnames = new String[n];
|
int i = 0;
|
while (keys.hasMoreElements()) {
|
allnames[i++] = keys.nextElement();
|
}
|
}
|
return allnames;
|
}
|
|
public boolean register(final String name, final OtpMbox mbox) {
|
if (name == null) {
|
if (mbox.name != null) {
|
byName.remove(mbox.name);
|
mbox.name = null;
|
}
|
} else {
|
synchronized (byName) {
|
if (get(name) != null) {
|
return false;
|
}
|
byName.put(name, new WeakReference<OtpMbox>(mbox));
|
mbox.name = name;
|
}
|
}
|
return true;
|
}
|
|
/*
|
* look up a mailbox based on its name. If the mailbox has gone out of
|
* scope we also remove the reference from the hashtable so we don't
|
* find it again.
|
*/
|
public OtpMbox get(final String name) {
|
final WeakReference<OtpMbox> wr = byName.get(name);
|
if (wr != null) {
|
final OtpMbox m = wr.get();
|
if (m != null) {
|
return m;
|
}
|
byName.remove(name);
|
}
|
return null;
|
}
|
|
/*
|
* look up a mailbox based on its pid. If the mailbox has gone out of
|
* scope we also remove the reference from the hashtable so we don't
|
* find it again.
|
*/
|
public OtpMbox get(final OtpErlangPid pid) {
|
final WeakReference<OtpMbox> wr = byPid.get(pid);
|
if (wr != null) {
|
final OtpMbox m = wr.get();
|
if (m != null) {
|
return m;
|
}
|
byPid.remove(pid);
|
}
|
return null;
|
}
|
|
public void remove(final OtpMbox mbox) {
|
byPid.remove(mbox.self);
|
if (mbox.name != null) {
|
byName.remove(mbox.name);
|
}
|
}
|
}
|
|
/*
|
* this thread simply listens for incoming connections
|
*/
|
public class Acceptor extends Thread {
|
private final OtpServerTransport sock;
|
private final int acceptorPort;
|
private volatile boolean done = false;
|
|
Acceptor(final int port) throws IOException {
|
sock = createServerTransport(port);
|
acceptorPort = sock.getLocalPort();
|
OtpNode.this.port = acceptorPort;
|
setDaemon(true);
|
setName("acceptor");
|
publishPort();
|
start();
|
}
|
|
private boolean publishPort() throws IOException {
|
if (getEpmd() != null) {
|
return false; // already published
|
}
|
OtpEpmd.publishPort(OtpNode.this);
|
return true;
|
}
|
|
private void unPublishPort() {
|
// unregister with epmd
|
OtpEpmd.unPublishPort(OtpNode.this);
|
// close the local descriptor (if we have one)
|
closeSock(epmd);
|
epmd = null;
|
}
|
|
public void quit() {
|
unPublishPort();
|
done = true;
|
closeSock(sock);
|
localStatus(node, false, null);
|
}
|
|
private void closeSock(final OtpServerTransport s) {
|
try {
|
if (s != null) {
|
s.close();
|
}
|
} catch (final Exception e) {
|
}
|
}
|
|
private void closeSock(final OtpTransport s) {
|
try {
|
if (s != null) {
|
s.close();
|
}
|
} catch (final Exception e) {
|
}
|
}
|
|
public int port() {
|
return acceptorPort;
|
}
|
|
public void run() {
|
OtpTransport newsock;
|
OtpCookedConnection conn;
|
localStatus(node, true, null);
|
accept_loop:
|
while (!done) {
|
conn = null;
|
try {
|
newsock = sock.accept();
|
} catch (final Exception e) {
|
// Problem in java1.2.2: accept throws SocketException
|
// when socket is closed. This will happen when
|
// acceptor.quit()
|
// is called. acceptor.quit() will call localStatus(...), so
|
// we have to check if that's where we come from.
|
if (!done) {
|
localStatus(node, false, e);
|
}
|
break accept_loop;
|
}
|
try {
|
synchronized (connections) {
|
conn = new OtpCookedConnection(OtpNode.this, newsock);
|
conn.setFlags(connFlags);
|
addConnection(conn);
|
}
|
} catch (final OtpAuthException e) {
|
if (conn != null && conn.name != null) {
|
connAttempt(conn.name, true, e);
|
} else {
|
connAttempt("unknown", true, e);
|
}
|
closeSock(newsock);
|
} catch (final IOException e) {
|
if (conn != null && conn.name != null) {
|
connAttempt(conn.name, true, e);
|
} else {
|
connAttempt("unknown", true, e);
|
}
|
closeSock(newsock);
|
} catch (final Exception e) {
|
closeSock(newsock);
|
closeSock(sock);
|
localStatus(node, false, e);
|
break accept_loop;
|
}
|
} // while
|
// if we have exited loop we must do this too
|
unPublishPort();
|
}
|
}
|
}
|