Class TcpOITransport

java.lang.Object
com.nt.udc.ndk.node.OITransport
com.nt.udc.oi.transport.TcpOITransport
All Implemented Interfaces:
DataReceiverIfc, DCTransport, FileDataReceiverIfc, StateBufferSaveable, Runnable

public class TcpOITransport extends OITransport implements StateBufferSaveable
This class will create a socket on the given host/port for sending data via TCP. The data will be extracted from the incoming OIRecord() objects via their toByteArray() method.
The recommended use of this transport is for it to be run within its own thread so it can 'pull' the data from its DataProvider. This will allow the transport to control the flow of data it receives and stop that flow if the TCP connection is lost.
If the TCP connection is lost, the current state of the node is saved to mark the current input record being processed, should some form of recovery be needed at a later time. Meanwhile, a thread will be spawned to periodically attempt to re-establish the connection. Until the connection is made, the transport will block and stop processing and requesting data, unless the transport has been configured to drop the records while the connection is down.
If the transport is instructed to shutdown while the connection is down and processing has been stopped, the current state of the node is saved. At the next startup of the transport, processing will begin where it previously stopped.
  • Constructor Details

    • TcpOITransport

      public TcpOITransport(OINode oinode, String host, int prt) throws NodeStartException
      Construct a new TcpOITransport to be used within the specified OINode. The transport will create a socket on the given host/port and transmit via TCP the data contained within the incoming OIRecord objects(). The data will be extracted from the records via their toByteArray() method. If the socket connection cannot be established during the construction of the transport, a NodeStartException is thrown.
      Parameters:
      oinode - Reference to the OINode containing the transport
      host - The host name / IP address to which to connect
      port - The port number to which to connect
      Throws:
      NodeStartException
    • TcpOITransport

      public TcpOITransport(OINode oinode, String host, int prt, boolean wait) throws NodeStartException
      Construct a new TcpOITransport to be used within the specified OINode. The transport will create a socket on the given host/port and transmit via TCP the data contained within the incoming OIRecord objects(). The data will be extracted from the records via their toByteArray() method. If the socket connection cannot be established during the construction of the transport, a NodeStartException is thrown unless the 'wait' flag is set to false. If records are to be dropped during a lost connection, this constructor will spawn a thread to periodically attempt to establish the connection while data is 'processed' and dropped. This allows the node to be started without establishing the socket connection.
      Parameters:
      oinode - Reference to the OINode containing the transport
      host - The host name / IP address to which to connect
      wait - true, if transport should hold the current record until the connection has been re-established. Otherwise, the current record will be dropped and processing will continue with the next record.
      port - The port number to which to connect
      Throws:
      NodeStartException
  • Method Details

    • dataIsAvailable

      public void dataIsAvailable()
      This method allows the transport's DataProvider to tell the transport that data is waiting to be 'picked up'. It is then the responsibility of the transport to retrieve the data via the provider's getData() method and process it via the receiver's processData() method.
      Specified by:
      dataIsAvailable in interface DataReceiverIfc
      Specified by:
      dataIsAvailable in class OITransport
    • processData

      public void processData(DCFieldContainer record) throws NodeProcessingException
      Accept an OIRecord to be transmitted via TCP. This method will block until the data can be successfully transmitted, unless the transport is configured to drop records while the connection is lost.
      Specified by:
      processData in interface DataReceiverIfc
      Specified by:
      processData in class OITransport
      Parameters:
      record - OIRecord containing the data.
      Throws:
      NodeProcessingException
    • processData

      public void processData(DCFieldContainer[] records) throws NodeProcessingException
      Accept some OIRecord objects to be transmitted via TCP. This method will block until the data can be successfully transmitted, unless the transport is configured to drop records while the connection is lost.
      Specified by:
      processData in interface DataReceiverIfc
      Parameters:
      records - An array of OIRecord objects.
      Throws:
      NodeProcessingException
    • saveState

      public void saveState(StateBuffer buffer) throws StateException
      Save the Object's state to the given buffer.
      Specified by:
      saveState in interface StateBufferSaveable
      Parameters:
      buffer - Buffer for writing
      Throws:
      StateException
    • restoreState

      public void restoreState(StateBuffer buffer) throws StateException
      Restore the Object's state from the given buffer.
      Specified by:
      restoreState in interface StateBufferSaveable
      Parameters:
      buffer - Buffer for reading
      Throws:
      StateException
    • run

      public void run()
      This method is used when the OITransport runs within its own thread. The method will retrieve the data from the transport's DataProvider pass the DCFieldContainer() objects on to the processData() method.
      Specified by:
      run in interface Runnable
      Specified by:
      run in class OITransport
    • shutdown

      public void shutdown()
      Stop the transport. To ensure no loss of data, this method will wait for the transport to obtain and process all pending data from its DataProvider, unless it is shutting down due to a processing error.

      If the TCP connection has been lost and processing has been stoppped until the connection has been re-established, an indicator will be written to the node's scratch directory, marking the position in the current NAR file. When the transport is restarted, this marker will be read and processing will resume at that point in the NAR file.

      Specified by:
      shutdown in interface DCTransport
      Specified by:
      shutdown in class OITransport
    • isHealthy

      public boolean isHealthy()
      Indicates whether the transport is fully functional
      Specified by:
      isHealthy in interface DCTransport
      Overrides:
      isHealthy in class OITransport
      Returns:
      true if the transport is healthy, false otherwise
    • isConnected

      public boolean isConnected()
      Indicates whether this TcpOITransport is connected to the TCP socket
    • isDataAvailable

      public boolean isDataAvailable()
      Indicates whether data is available from the transport's DataProvider
    • isRunning

      public boolean isRunning()
      Indicates whether this TcpOITransport is running
    • setRunning

      protected void setRunning(boolean run)
      Sets the boolean which indicates whether the transport is running
    • isProcessingData

      public boolean isProcessingData()
      Indicates whether this TcpOITransport is currently processing data
    • setProcessingData

      protected void setProcessingData(boolean value)
      Sets the boolean indicating whether the transport is currently processing data.
    • isWaitForReconnect

      public boolean isWaitForReconnect()
      Indicates whether this TcpOITransport is configured to, in the event of a lost connection, wait for the connection to be re-established before sending the data. The default is to wait.
      Returns:
      true, if transport will hold the current record until the connection has been re-established. Otherwise, the current record will be dropped and processing will continue with the next record.
    • setWaitForReconnect

      public void setWaitForReconnect(boolean value)
      Sets the boolean indicating whether the transport should, in the event of a lost connection, wait for the connection to be re-established before sending the current record or drop the current record and move on to processing the next record.
    • connect

      protected void connect() throws IOException
      Create the socket and attempt to establish a connection.
      Throws:
      IOException
    • write

      protected void write(byte[] bytes) throws IOException
      Write the given bytes to the socket's output stream.
      Parameters:
      bytes - Bytes to be written to the socket's output stream.
      Throws:
      IOException
    • close

      protected void close()
      Close the socket and its output stream.
    • reconnect

      protected void reconnect()
      Attempt to re-establish the connection.