After my post a few days ago about analyzing Apache log files with Riak, I thought I would follow that up by showing how to do the same thing using Hadoop. I am not going to cover how to install Hadoop; I am going to assume you already have it installed. What is it they say about assumptions? Also, any Hadoop commands are executed relative to the directory where Hadoop is installed ($HADOOP_HOME).
As before, the first thing to do is to get log data into the system so it can be analyzed; in this case the Hadoop Distributed File System (HDFS). I copied all of my log files into /tmp/logs on the server where I was running Hadoop. To import the data into HDFS, run the following command:
./bin/hadoop dfs -put /tmp/logs /var/logs
This copies all the files from the directory /var/logs on the local file system into the location /var/logs in HDFS. Remember: the destination location is the location in HDFS and does NOT correspond to a directory in the local file system. To see if the log files were copied, run the following:
./bin/hadoop dfs -ls /var/logs
You should see a list of the files you just imported.
Just like with Riak, we need two classes: a Mapper and a Reducer. The Mapper class contains a map function, which is called for each input – in this case, each log file. The Reducer class contains a reduce function, which is called once for each key, and takes as input the intermediate results from the map phases.
The code for the Mapper class looks like this:
public static class LogEntryMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1); private Text url = new Text(); private Pattern p = Pattern.compile("(?:GET|POST)\\s([^\\s]+)"); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] entries = value.toString().split("\r?\n"); for (int i=0, len=entries.length; i<len; i+=1) { Matcher matcher = p.matcher(entries[i]); if (matcher.find()) { url.set(matcher.group(1)); context.write(url, one); } } } }
The map function takes the text from each log file and splits it at the newline character to get an array of individual log entries. The URL is then extracted from each line and a count of 1 is assigned to each URL instance. The reduce function takes these individual counts and adds them together to get a total for each key (e.g. URL). The Reducer class looks like this:
public static class LogEntryReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable total = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } total.set(sum); context.write(key, total); } }
The main() method sets up the map-reduce job and starts it:
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); if (args.length != 2) { System.err.println("Usage: loganalyzer <in> <out>"); System.exit(2); } Job job = new Job(conf, "analyze log"); job.setJarByClass(LogAnalyzer.class); job.setMapperClass(LogEntryMapper.class); job.setReducerClass(LogEntryReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }
As I mentioned in my previous article, it’s possible with Hadoop to specify a combiner class to perform a reduce-type operation after each map phase. If you want to do that here, you will need to add the following line to the job definition:
job.setCombinerClass(LogEntryReducer.class);
Now that the data is available to analyze, the map-reduce job can be started from a local JAR file:
./bin/hadoop jar loganalyzer.jar loganalyzer /var/logs /var/logs-output
(If you are following along at home you will need to download loganalyzer.jar)
This tells the map-reduce job to take all the files in /var/logs as input and output the results of the job to /var/logs-output (in HDFS). To copy the results locally, once the job is complete, execute the following:
./bin/hadoop dfs -getmerge /var/logs-output /tmp/logs-output
If you open up the file in /tmp/logs-output, you should see something like this:
simonbuckle.com/feed/atom 11 simonbuckle.com/index.php 13 simonbuckle.com/robots.txt 2 ...
That’s it! The results could be tidied up a bit by removing the hostname as that doesn’t add anything but other than that, it does the job.
The code that defines the Mapper and Reducer classes is here – I have put everything in one file to make it easier to read. The code for the driver is here. All code was compiled against Hadoop version 0.20.203.0.
As before, feel free to leave any comments.
If you Apache log files are really large, it will take a very long time to copy them to HDFS. What solution do you propose to avoid this? Do you think a background thread copying the Apache logs periodically to hdfs is a good idea?
One option could be to rotate the logs more frequently, which will result in smaller logs.
If any new log file is added to HDFS then again we have to run this program.
What is the strategy used by Hadoop to distribute the files to various nodes? Is one entire file sent to a mapper or it could be possible the file is chunked by size or number of lines?
What if some files are much bigger than others?
it depends upon the inputsplits of the input format that we are using