Использование Apache Mahout для кластеризации данных на Hadoop платформе

Как мы все знаем у организации Apache очень много проектов. Apache Foundation - это настоящий инкубатор IT проектов. Все знают Apache прежде всего по их веб-серверу. Однако кроме веб-сервера Apache продвигает много других очень полезных проектов. Один из таких перспективных проектов это Apache Hadoop. Apache Hadoop позволяет делать распределенные вычисления. Зачем это нужно? В первую очередь для обработки больших объемов данных. Сейчас есть такой современный тренд - Big Data - это большие данные. Но насколько большие? На этот вопрос все отвечают по разному. Для одной организации петабайт может являться в принципе нормальным объемом данных, а для другой это уже проблема больших данных. Если копнуть глубже то тут возникает целый ряд проблем: хранение, поиск, аналитика, и визуализация. Проблема хранения решается в принципе просто посредством какой-нибудь распределенной СУБД. Более сложно дело обстоит с поиском нужной информации, аналитикой и визуализацией. Вот как раз таки Apache Mahout и поможет нам программировать аналитику. Итак что мы имеем для решения проблемы больших данных:
  • хранение - распределенная СУБД
  • процессинг (обработка данных) - Apache Hadoop (установка)
  • аналитика - Apache Mahout

Т.к. кроме Apache Mahout у меня всё уже поставлено более менее нормально, то в этом посте рассмотрим установку Apache Mahout и в качестве теста запустим простой пример кластеризации данных. Так чтобы не заниматься болтовней сразу качаем дистрибутив http://apache.osuosl.org//mahout/0.5/mahout-distribution-0.5.tar.gz. Если кто не знает как распаковывать .tar.gz то дайте команду:

tar -zxvf mahout-distribution-0.5.tar.gz

Смотрим в содержимое:

Вот некоторые из этих *.jar файлов как раз и используются в качестве библиотек для java программ которые я буду писать (может быть) в дальнейшем. Также там в папке lib есть много дополнительных библиотек. Структура папки в которую установлен Hadoop (${HADOOP_HOME}) примерно такая же. Тоже есть в корне jar файлы и папка lib тоже имеется. Так вот в папку lib (эта уже хадуповская) собираются все библиотеки которые будут использоваться распределено работающими программами. А если вы возьмете какой-нибудь java исходник махаутовской программы то там в начале будет подключение библиотек типа такого:

import org.apache.mahout.clustering.kmeans.Cluster;
import org.apache.mahout.clustering.kmeans.KMeansDriver;
import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;

Недавно мне надо было подключить библиотеку Apache Lucene и я просто соответствующие jar файлы из ее дистрибутива закинул в папку lib от хадупа и программа нормально запустилась. Попробую сделать по этой же схеме и для махаута.


Это нормальный способ. Единственное что неудобно так это то, что надо на каждый хост в кластере копировать эти библиотеки. Для теста я возьму простую реализацию алгоритма кластеризации данных из книги издательства Manning "Mahout in Action":

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.mahout.clustering.WeightedVectorWritable;
import org.apache.mahout.clustering.kmeans.Cluster;
import org.apache.mahout.clustering.kmeans.KMeansDriver;
import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;

public class SimpleKMeansClustering {
  public static final double[][] points = { {1, 1}, {2, 1}, {1, 2},
                                           {2, 2}, {3, 3}, {8, 8},
                                           {9, 8}, {8, 9}, {9, 9}};
  
  public static void writePointsToFile(List<Vector> points,
                                       String fileName,
                                       FileSystem fs,
                                       Configuration conf) throws IOException {
    Path path = new Path(fileName);
    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf,
        path, LongWritable.class, VectorWritable.class);
    long recNum = 0;
    VectorWritable vec = new VectorWritable();
    for (Vector point : points) {
      vec.set(point);
      writer.append(new LongWritable(recNum++), vec);
    }
    writer.close();
  }
  
  public static List<Vector> getPoints(double[][] raw) {
    List<Vector> points = new ArrayList<Vector>();
    for (int i = 0; i < raw.length; i++) {
      double[] fr = raw[i];
      Vector vec = new RandomAccessSparseVector(fr.length);
      vec.assign(fr);
      points.add(vec);
    }
    return points;
  }
  
  public static void main(String args[]) throws Exception {
    
    int k = 2;
    
    List<Vector> vectors = getPoints(points);
    
    File testData = new File("testdata");
    if (!testData.exists()) {
      testData.mkdir();
    }
    testData = new File("testdata/points");
    if (!testData.exists()) {
      testData.mkdir();
    }
    
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(conf);
    writePointsToFile(vectors, "testdata/points/file1", fs, conf);
    
    Path path = new Path("testdata/clusters/part-00000");
    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf,
        path, Text.class, Cluster.class);
    
    for (int i = 0; i < k; i++) {
      Vector vec = vectors.get(i);
      Cluster cluster = new Cluster(vec, i, new EuclideanDistanceMeasure());
      writer.append(new Text(cluster.getIdentifier()), cluster);
    }
    writer.close();
    
    KMeansDriver.run(conf, new Path("testdata/points"), new Path("testdata/clusters"),
      new Path("output"), new EuclideanDistanceMeasure(), 0.001, 10,
      true, false);
    
    SequenceFile.Reader reader = new SequenceFile.Reader(fs,
        new Path("output/" + Cluster.CLUSTERED_POINTS_DIR
                 + "/part-m-00000"), conf);
    
    IntWritable key = new IntWritable();
    WeightedVectorWritable value = new WeightedVectorWritable();
    while (reader.next(key, value)) {
      System.out.println(value.toString() + " belongs to cluster "
                         + key.toString());
    }
    reader.close();
  }
  
}

Итак начнем.
1) Из дистрибутива (mahout-distribution-0.5.tar.gz) копируем нижеприведенный список файлов в папку ${HADOOP_HOME}/lib/
  • mahout-core-0.5.jar
  • mahout-math-0.5.jar
  • mahout-collections-1.0.jar
  • google-collections-1.0-rc2.jar
  • commons-cli-2.0-mahout.jar
2) Создаем примерно такую структуру проекта:
  • playground/SimpleKMeansClustering/classes/
  • playground/SimpleKMeansClustering/src/
  • playground/SimpleKMeansClustering/src/SimpleKMeansClustering.java
  • playground/SimpleKMeansClustering/build.sh
  • playground/SimpleKMeansClustering/run.sh
3) В файле SimpleKMeansClustering.java размещаем приведенный выше код кластеризации данных.

4) В файле playground/SimpleKMeansClustering/build.sh пишем следующий скрипт:

rm -rf ./classes/*

javac -classpath ${HADOOP_HOME}/hadoop-common-0.21.0.jar:${HADOOP_HOME}/lib/mahout-core-0.5.jar
:${HADOOP_HOME}/lib/mahout-math-0.5.jar -d ./classes ./src/SimpleKMeansClustering.java

rm -rf ./SimpleKMeansClustering.jar

jar -cvf ./SimpleKMeansClustering.jar -C ./classes/ .

Переходим в папку playground/SimpleKMeansClustering/ и запускаем ./build.sh. Что дает нам на выходе в той же папке SimpleKMeansClustering.jar, который мы будем запускать на хадупе.

5) В файле playground/SimpleKMeansClustering/run.sh пишем следующий скрипт:

${HADOOP_HOME}/bin/hadoop dfs -rmr /user/hadoop/testdata

${HADOOP_HOME}/bin/hadoop jar ./SimpleKMeansClustering.jar SimpleKMeansClustering 
-libjars ${HADOOP_HOME}/lib/commons-lang-2.5.jar

Из папки playground/SimpleKMeansClustering/ запускаем ./run.sh. И получаем результат: Что в принципе соответствует книжной иллюстрации: 



Вроде все просто. По-быстрому поставили, запустили простейшую программу и получили результаты.