Home » Java » Java Http Server to Stream Logs

Java Http Server to Stream Logs

Streaming Logs is technique you can use if you need to know more about your application performance/usage.
In this post I would like show how to develop a Java based Http Server which you can use to Stream Logs.
Code for application can be found here.

Update – I recently ported this to Scala. Take a look here if that interests you.

Overview

Concept can be depicted in diagram below

Java Log Server

Java Log Server

Below are the important components in the application

  • LogServer – Server which streams logs
  • LogWatcher – Thread to monitor logs
  • MessageQueue – Stores Messages to be sent to client
  • MessageListener – Interface to listen for messages
  • ConnectionWatcher – Thread to monitor connection
  • ConnectionHandler – Thread which streams response to client

Lets look at each of them in detail

LogServer

This is a Http Server which accepts connections from client. When server is started, it performs below functions

  • Starts LogWatcher
  • Starts ConnectionWatcher
  • Waits for connection request

when request for connection comes , it

  • Creates connectionHandler
  • Adds connectionHandler to listeners in MessageQueues

Below is section from code

       public void startServer() throws IOException{
		System.out.println("Starting server...");
		executor.execute(logWatcher);
		executor.execute(connectionWatcher);
		while(keepRunning){
			SocketChannel socketChannel = serverSocketChannel.accept();
			handleConnection(socketChannel);
		}
	}

       private void handleConnection(final SocketChannel channel)   {
	   try{
	        System.out.println("Handling request for client. Address ="+channel.socket().getInetAddress()
                                                               +", port="+channel.socket().getPort());
				ConnectionHandler connectionHandler = new ConnectionHandler(channel);
				messageQueue.addListener(connectionHandler);
				executor.execute(connectionHandler);
				connectionWatcher.addConnection(channel);
				System.out.println("Handling request for client complete");
	     } catch(IOException ioex){
				System.out.println("Exception handling request for client. Address
                                 ="+channel.socket().getInetAddress()+", port ="+channel.socket().getPort());
	     }
	}

LogWatcher

This component monitors log file for changes.It performs below functions

  • Monitors size of file determine changes.
  • Changes are added to MessageQueue.
  • Uses RandomAccessFile , so entire file is not scanned each time

Below is section from code


	public void run() {
		System.out.println("Running LogWatcher....");
		System.out.println("LogWatcher Thread - "+Thread.currentThread().getName());
		long origSize = 0;
		while(keepRunning) {
			try {
				if(logFile.length() > origSize) {
					addLinesToQueue();
				}
				origSize = logFile.length();
				if((msgCount % 100) == 0){
					Thread.sleep(20000);
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	private void addLinesToQueue() {
	      try{
		     RandomAccessFile raf = new RandomAccessFile(logFile, "r");
		     raf.skipBytes(bytesToSkip);
		     String line="";
		     while((line=raf.readLine())!=null) {
			 if ( line.length() >=4 && line.substring(0,4).matches(lineStartPattern) ) {
				messageQueue.addToQueue(line);
				msgCount++;
			  }
		          bytesToSkip = bytesToSkip + line.getBytes().length;
		     }
		} catch (FileNotFoundException fnex){
		     System.out.println("cannot locate log file");
		} catch (IOException e) {
		     e.printStackTrace();
		}
	}

MessageQueue

This object stores messages which are to be sent to all Listeners. Internally it uses

  • Unbounded LinkedBlockingQueue to store messages
  • Concurrent HashMap to store Listeners

It notifies  listeners if number of messages in queue exceeds threshold (100 in this case)
Below is section from code

      public void addToQueue(String msg)  {
		if (listeners.size() > 0) {
			try {
				msgQueue.put(msg);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}

	   if(msgQueue.size() ==100){
			System.out.println("notifying listeners start");
			notifyListeners();
			System.out.println("notifying listeners end");
		}
	}

MessageListener

This interface listens to changes in MessageQueue and handles those messages. In this particular example, ConnectionHandlers implement this interface and listen for messages.
This also illustrates use of Observer Pattern, wherein you have set of registered listeners who are listening for changes.
Below is section from code

       public interface MessageListener {
             public void onMessage(String message) /*throws Exception*/;
             public String getName();
          }

ConnectionHandler

This object  handles responses for client. For every request, Server instantiates new ConnectionHandler. 
ConnectionHandler implements 2 interfaces

  • Runnable – To facilitate writing to connection output stream
  • MessageListener – to listen for new messages from MessageQueue

To write to output, it uses PrintWriter . When creating instance of PrintWriter we set autoflush to true, so data is flushed to outputstream . Also notice PrintWriter is kept open and closed only when thread is stopped
Below is section from code

       public void run() {
       try {
            System.out.println("RequestHandler Thread - "+Thread.currentThread().getName());
            while(keepRunning){
                String line = messagesQueue.take();
                if(line.length() > 0) {
                    pw.println(line);
                }
            }
            pw.close();
        } catch (InterruptedException e) {
              // TODO Auto-generated catch block
           e.printStackTrace();
         }
    }

@Override
      public void onMessage(String message) {
          try {
             messagesQueue.put(message);
          } catch (InterruptedException e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
          }
      }

ConnectionWatcher

This thread facilitates monitoring of connection and notifies MessageQueue when connection is closed. ( MessageQueue removes Handler from list of listeners) .
Default methods which are available in sockets don’t provide information if connection with remote client is open/closed. One way to determine this is to read through the SocketChannel. If connection is closed, IOException is thrown. ( This is reason we are using SocketChannel)

Below is section of code


      public void run() {
        while(keepRunning){
            System.out.println("Running ConnectionWatcher...");
                for(SocketChannel socketChannel:connectionQueue){
                   if(!isConnectionOpen(socketChannel)){
                        System.out.println("Removing ="+socketChannel.socket().getInetAddress()+":
                                           "+socketChannel.socket().getPort());
                         connectionQueue.remove(socketChannel);
                        messageQueue.removeListener(socketChannel.socket().getInetAddress()+":"+
                        socketChannel.socket().getPort());
                   } else {
                        System.out.println("Connection ="+socketChannel.socket().getInetAddress()+":
                         "+socketChannel.socket().getPort()+" is open.");
                   }
           }
           try {
               Thread.sleep(10000);
           } catch (InterruptedException e) {
              // TODO Auto-generated catch block
               e.printStackTrace();
           }
       }
     }

      private boolean isConnectionOpen (SocketChannel connection){
          try {
            ByteBuffer buffer = ByteBuffer.allocate(24);
            int val = connection.read(buffer);
            System.out.println("Is connection Open , Val ="+val);
            return true;
           } catch (IOException e) {
               e.printStackTrace();
               return false;
            }
       }

Running The application

1. Start LoggingApp – this is simple java program which generates logs. It uses log4j.  (pass log4j.xml as java -D param)
2. Start LoggingServer ( pass log file as parameter)
3. Open Terminal (one or multiple) and type curl http://localhost:8080/ to see the logfiles in terminal

Usecases

Below are some use cases

1. Monitor application performance/usage in real time . Heroku recently released log2viz for their apps, which is similar concept.
2. Stream data to Hadoop or other ML systems to do facilitate predictive analysis

Conclusion

From this application/prototype, below are some concepts we can learn

1. Use of Concurrent collections ( you don’t have to worry about synchronized blocks or locks).
2. Use of Executor Framework , Producer-Consumer pattern.
3. Use of Observer Pattern .
4. Socket programming

About these ads

5 Comments

  1. Rahul says:

    Nice concept

  2. […] Great presentation about spring security at slideshare […]

  3. […] had developed similar prototype using Java some-time back. You look it up here. However since am learning Scala, decided to port the code to […]

  4. […] had developed similar prototype using Java some-time back. You look it up here. However since am learning Scala, decided to port the code to […]

  5. This web site really has all the information and facts I wanted concerning this
    subject and didn’t know who to ask.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.

%d bloggers like this: