pageCache

pageCache

1、什么是pageCache

页高速缓存是linux内核实现的一种主要磁盘缓存,它主要用来减少对磁盘的IO操作,具体地讲,是通过把磁盘中的数据缓存到物理内存中,把对磁盘的访问变为对物理内存的访问。
页高速缓存是由RAM中的物理页组成的,缓存中的每一页都对应着磁盘中的多个块。每当内核开始执行一个页IO操作时,就先到高速缓存中找。这样就可以大大减少磁盘操作。
    说明这里的磁盘中的多个快,并不是物理层面的块,是文件系统抽象出来的存储单元,不同的文件系统这个块的大小不一样,可以设置,理解为默认1K就好。
    而真正物理层面上的块,严格意义来讲应该叫做扇区(Sector),它是磁盘设备寻址的最小单元,默认大小是0.5K。
    pageCache默认是4K

    Linux内核还要求 Block_Size = Sector_Size  * (2的n次方),并且Block_Size <= 内存的Page_Size(页大小)

2、pageCache和pageBuffer

在linux系统中,为了加快文件的读写,内核中提供了page cache作为缓存,称为页面缓存(page cache)。
为了加快对块设备的读写,内核中还提供了buffer cache作为缓存。
在2.4内核中,这两者是分开的。

这样就造成了双缓冲,因为文件读写最后还是转化为对块设备的读写。
在2.6中,buffer cache合并到page cache中,对应的页面叫作buffer page。
当进行文件读写时,如果文件在磁盘上的存储块是连续的,那么文件在page cache中对应的页是普通的page,如果文件在磁盘上的数据块是不连续的,或者是设备文件,那么文件在page cache中对应的页是buffer page。buffer page与普通的page相比,每个页多了几个buffer_head结构体(个数视块的大小而定)。此外,如果对单独的块(如超级块)直接进行读写,对应的page cache中的页也是buffer page。这两种页面虽然形式略有不同,但是最终他们的数据都会被封装成bio结构体,提交到通用块设备驱动层,统一进行I/O调度。

HDFS 大文件上传工具

HDFS 大文件上传工具

1、由来

在虚拟机中做的集群没有开通FTP服务,rz命令上传较大数据时会报错,就根据HDFS Java API 编写了个上传工具。

2、下载地址

下载

3、运行效果

HDFS 大文件上传工具

4、代码实现

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import javax.swing.*;
import java.awt.*;
import java.awt.event.*;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class HdfsUploadTool extends JPanel {
    private static final int WIDTH = 500;
    private static final int HEIGHT = 200;
    private JFrame mainFrame;

    public HdfsUploadTool() {
        //设置窗口的位置信息
        mainFrame = new JFrame("HDFS大文件上传工具 for Hadoop 2.6.1-QQ:171418298  maoxiangyi.cn");
        mainFrame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
        GridBagLayout layout = new GridBagLayout();
        setLayout(layout);
        mainFrame.add(this, BorderLayout.WEST);
        mainFrame.setSize(WIDTH, HEIGHT);
        Toolkit kit = Toolkit.getDefaultToolkit();
        Dimension screenSize = kit.getScreenSize();
        int width = screenSize.width;
        int height = screenSize.height;
        int x = (width - WIDTH) / 2;
        int y = (height - HEIGHT) / 2;
        mainFrame.setLocation(x, y);

        //设置窗口中的组件内容
        final JLabel addressLabel = new JLabel("  集群地址:");
        this.add(addressLabel);
        final JTextField addressText = new JTextField(12);
        addressText.setText("hdp-1.yjy.itcast.local");
        this.add(addressText);
        final JLabel accountLabel = new JLabel("  账户名称:");
        this.add(accountLabel);
        final JTextField accountText = new JTextField(8);
        accountText.setText("hadoop");
        this.add(accountText);
        final JLabel replicationLabel = new JLabel("  副本数:");
        this.add(replicationLabel);
        final JTextField replicationText = new JTextField(5);
        replicationText.setText("2");
        this.add(replicationText);


        final JLabel label = new JLabel("  选择文件:");
        this.add(label);
        final JTextField selectPath = new JTextField(5);
        this.add(selectPath);
        JButton viewButton = new JButton("选择");
        this.add(viewButton);

        final JLabel toPathLabel = new JLabel("  HDFS路径:");
        this.add(toPathLabel);
        final JTextField toPathText = new JTextField(5);
        toPathText.setText("/");
        this.add(toPathText);
        JButton uploadButton = new JButton("上传");
        this.add(uploadButton);

        final JLabel tips = new JLabel("  运行状态:");
        this.add(tips);
        final JTextArea txtDisplay = new JTextArea();
        JScrollPane resultArea = new JScrollPane(txtDisplay);
        resultArea.setHorizontalScrollBarPolicy(JScrollPane.HORIZONTAL_SCROLLBAR_AS_NEEDED);
        resultArea.setVerticalScrollBarPolicy(JScrollPane.VERTICAL_SCROLLBAR_AS_NEEDED);
        this.add(resultArea);


        //点击按钮选择文件
        viewButton.addActionListener(new ActionListener() {
            public void actionPerformed(ActionEvent e) {
                JFileChooser jfc = new JFileChooser();
                jfc.setFileSelectionMode(JFileChooser.FILES_AND_DIRECTORIES);
                jfc.showDialog(new JLabel(), "选择");
                File file = jfc.getSelectedFile();
                selectPath.setText(file.getAbsolutePath());
                txtDisplay.setText("");
            }
        });

        //点击按钮选择文件
        uploadButton.addActionListener(new ActionListener() {
            public void actionPerformed(ActionEvent e) {
                String ip = addressText.getText();
                String account = accountText.getText();
                String replication = replicationText.getText();
                String orgFile = selectPath.getText();
                String targetFile = toPathText.getText();
                StringBuffer info = new StringBuffer();
                info.append("-------------参数列表-------------");
                info.append("\n");
                info.append("集群地址:"+"\t"+ip+"\n");
                info.append("账户名称:"+"\t"+account+"\n");
                info.append("HDFS副本数:"+"\t"+replication+"\n");
                info.append("本地路径:"+"\t"+orgFile+"\n");
                info.append("目标路径:"+"\t"+targetFile+"\n");
                info.append("\n");
                info.append("-------------执行过程-------------");
                info.append("\n");
                info.append("查询目标文件...");
                info.append("\n");
                info.append("开始上传文件..."+orgFile);
                info.append("\n");
                txtDisplay.setText(info.toString());
                try {
                    uploadFileOrDir(ip, account, replication, orgFile, targetFile);
                    info.append("恭喜您,文件上传成功!");
                    txtDisplay.setText(info.toString());
                } catch (Exception e1) {
                    info.append("错误信息:");
                    info.append(e1.toString());
                    txtDisplay.setText(info.toString());
                }
            }
        });

        //设置网络布局
        GridBagConstraints constraints = new GridBagConstraints();
        //是用来控制添加进的组件的显示位置
        constraints.fill = GridBagConstraints.BOTH;
        //该方法是为了设置如果组件所在的区域比组件本身要大时的显示情况
        //NONE:不调整组件大小。
        //HORIZONTAL:加宽组件,使它在水平方向上填满其显示区域,但是不改变高度。
        //VERTICAL:加高组件,使它在垂直方向上填满其显示区域,但是不改变宽度。
        //BOTH:使组件完全填满其显示区域。

        constraints.gridwidth = 1;//该方法是设置组件水平所占用的格子数,如果为0,就说明该组件是该行的最后一个
        constraints.weightx = 0;//该方法设置组件水平的拉伸幅度,如果为0就说明不拉伸,不为0就随着窗口增大进行拉伸,0到1之间
        constraints.weighty = 0;//该方法设置组件垂直的拉伸幅度,如果为0就说明不拉伸,不为0就随着窗口增大进行拉伸,0到1之间
        layout.setConstraints(addressLabel, constraints);//设置组件
        constraints.gridwidth = 1;//该方法是设置组件水平所占用的格子数,如果为0,就说明该组件是该行的最后一个
        constraints.weightx = 0;//该方法设置组件水平的拉伸幅度,如果为0就说明不拉伸,不为0就随着窗口增大进行拉伸,0到1之间
        constraints.weighty = 0;//该方法设置组件垂直的拉伸幅度,如果为0就说明不拉伸,不为0就随着窗口增大进行拉伸,0到1之间
        layout.setConstraints(addressText, constraints);//设置组件
        constraints.gridwidth = 1;//该方法是设置组件水平所占用的格子数,如果为0,就说明该组件是该行的最后一个
        constraints.weightx = 0;//该方法设置组件水平的拉伸幅度,如果为0就说明不拉伸,不为0就随着窗口增大进行拉伸,0到1之间
        constraints.weighty = 0;//该方法设置组件垂直的拉伸幅度,如果为0就说明不拉伸,不为0就随着窗口增大进行拉伸,0到1之间
        layout.setConstraints(accountLabel, constraints);//设置组件
        constraints.gridwidth = 1;//该方法是设置组件水平所占用的格子数,如果为0,就说明该组件是该行的最后一个
        constraints.weightx = 0;//该方法设置组件水平的拉伸幅度,如果为0就说明不拉伸,不为0就随着窗口增大进行拉伸,0到1之间
        constraints.weighty = 0;//该方法设置组件垂直的拉伸幅度,如果为0就说明不拉伸,不为0就随着窗口增大进行拉伸,0到1之间
        layout.setConstraints(accountText, constraints);//设置组件
        constraints.gridwidth = 1;//该方法是设置组件水平所占用的格子数,如果为0,就说明该组件是该行的最后一个
        constraints.weightx = 0;//该方法设置组件水平的拉伸幅度,如果为0就说明不拉伸,不为0就随着窗口增大进行拉伸,0到1之间
        constraints.weighty = 0;//该方法设置组件垂直的拉伸幅度,如果为0就说明不拉伸,不为0就随着窗口增大进行拉伸,0到1之间
        layout.setConstraints(replicationLabel, constraints);//设置组件
        constraints.gridwidth = 0;//该方法是设置组件水平所占用的格子数,如果为0,就说明该组件是该行的最后一个
        constraints.weightx = 0;//该方法设置组件水平的拉伸幅度,如果为0就说明不拉伸,不为0就随着窗口增大进行拉伸,0到1之间
        constraints.weighty = 0;//该方法设置组件垂直的拉伸幅度,如果为0就说明不拉伸,不为0就随着窗口增大进行拉伸,0到1之间
        layout.setConstraints(replicationText, constraints);//设置组件

        constraints.gridwidth = 1;//该方法是设置组件水平所占用的格子数,如果为0,就说明该组件是该行的最后一个
        constraints.weightx = 0;//该方法设置组件水平的拉伸幅度,如果为0就说明不拉伸,不为0就随着窗口增大进行拉伸,0到1之间
        constraints.weighty = 0;//该方法设置组件垂直的拉伸幅度,如果为0就说明不拉伸,不为0就随着窗口增大进行拉伸,0到1之间
        layout.setConstraints(label, constraints);//设置组件
        constraints.gridwidth = 5;
        constraints.weightx = 0;
        constraints.weighty = 0;
        layout.setConstraints(selectPath, constraints);
        constraints.gridwidth = 0;
        constraints.weightx = 0;
        constraints.weighty = 0;
        layout.setConstraints(viewButton, constraints);

        constraints.gridwidth = 1;//该方法是设置组件水平所占用的格子数,如果为0,就说明该组件是该行的最后一个
        constraints.weightx = 0;//该方法设置组件水平的拉伸幅度,如果为0就说明不拉伸,不为0就随着窗口增大进行拉伸,0到1之间
        constraints.weighty = 0;//该方法设置组件垂直的拉伸幅度,如果为0就说明不拉伸,不为0就随着窗口增大进行拉伸,0到1之间
        layout.setConstraints(toPathLabel, constraints);//设置组件
        constraints.gridwidth = 5;
        constraints.weightx = 0;
        constraints.weighty = 0;
        layout.setConstraints(toPathText, constraints);
        constraints.gridwidth = 0;
        constraints.weightx = 0;
        constraints.weighty = 0;
        layout.setConstraints(uploadButton, constraints);

        constraints.gridwidth = 1;
        constraints.weightx = 0;
        constraints.weighty = 0;
        layout.setConstraints(tips, constraints);
        constraints.gridwidth = 0;
        constraints.weightx = 0;
        constraints.weighty = 1;
        layout.setConstraints(resultArea, constraints);

        mainFrame.setResizable(false);
        mainFrame.setVisible(true);
    }

    private void uploadFileOrDir(String ip, String account, String replication, String orgFile, String targetFile) throws URISyntaxException, IOException, InterruptedException {
        Configuration conf = new Configuration();
        //配置hdfs的访问路径
        conf.set("fs.defaultFS", "hdfs://" + ip + ":9000");
        //配置当前客户端存储文件时,要被分隔的bolck文件块的数量
        conf.set("dfs.replication", replication);
        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://" + ip + ":9000"), conf, account);
        fileSystem.copyFromLocalFile(new Path(orgFile), new Path(targetFile));
        fileSystem.close();
    }


    public static void main(String[] args) {
        HdfsUploadTool hdfsUploadTool = new HdfsUploadTool();
    }

}

nginx+php 部署总结

一、安装nginx

yum install nginx

如果报错信息如下:nginx: [emerg] socket() [::1]:80 failed (97: Address family not supported by protocol
删除default配置文件中的一行配置即可:
cd /etc/nginx/conf.d/
vi default.conf
删除 listen 80 default_server; 下的一行 listen .....

二、安装php

1、下载
http://www.php.net/downloads.php
在win下载之后,通过rz命令上传到集群上。
然后使用tar -zxvf 命令解压

2、安装基础软件包

yum -y install -y libxml2-devel
yum -y install -y openssl openssl-devel
yum -y install bzip2 bzip2-devel
yum -y install curl-devel
yum -y install readline-devel
yum -y install libmcrypt libmcrypt-devel mcrypt mhash

3、准备编译

./configure --prefix=/usr/local/php --with-config-file-path=/etc --enable-inline-optimization --disable-debug --disable-rpath --enable-shared --enable-opcache --enable-fpm --with-fpm-user=www --with-fpm-group=www --with-mysql=mysqlnd --with-mysqli=mysqlnd --with-pdo-mysql=mysqlnd --with-gettext --enable-mbstring --with-iconv --with-mcrypt --with-mhash --with-openssl --enable-bcmath --enable-soap --with-libxml-dir --enable-pcntl --enable-shmop --enable-sysvmsg --enable-sysvsem --enable-sysvshm --enable-sockets --with-curl --with-zlib --enable-zip --with-bz2 --with-readline --without-sqlite3 --without-pdo-sqlite --with-pear

4、安装php

make && make install

5、配置环境变量

vim /etc/profile
PATH=$PATH:/usr/local/php/bin
export PATH

6、使环境变量生效

source /etc/profile

7、验证php版本

php -v
结果如下:
PHP 5.6.28 (cli) (built: Nov 21 2016 16:26:20)
Copyright (c) 1997-2016 The PHP Group
Zend Engine v2.6.0, Copyright (c) 1998-2016 Zend Technologies

三、集成nginx 和 php

1、配置 php-fpm

cd /usr/local/php/etc
cp php-fpm.conf.default php-fpm.conf

2、启动 php-fpm

/usr/local/php/sbin/php-fpm

遇到错误:
ERROR: [pool www] cannot get uid for user 'www'
解决:
useradd www
passwd 【连续两次输入密码】

然后修改 php-fpm.conf 中的 user 和 group选项为www

3、修改nginx配置文件

cd /etc/nginx/conf.d/

配置文件如下:

server {
listen 80 default_server;
server_name _;
#root /usr/share/nginx/html;

# Load configuration files for the default server block.
include /etc/nginx/default.d/*.conf;

location / {

root /usr/share/nginx/html;

index index.html index.htm index.php;

}

location ~ \.php$ {

root /usr/share/nginx/html;

fastcgi_pass 127.0.0.1:9000;

fastcgi_index index.php;

fastcgi_param SCRIPT_FILENAME //usr/share/nginx/html/$fastcgi_script_name;

include fastcgi_params;

}

error_page 404 /404.html;
location = /40x.html {
}

error_page 500 502 503 504 /50x.html;
location = /50x.html {
}

四、总结

1、常用目录

nginx 配置文件目录:/etc/nginx/conf.d/
nginx 部署项目目录:/usr/share/nginx/html
php-fpm 配置文件目录:/usr/local/php/etc

2、启动脚本

nginx启动脚本:/usr/sbin/nginx

php-fpm启动脚本:/usr/local/php/sbin/php-fpm

3、文件上传命令

先安装 lrzsz ,命令如下:yum install -y lrzsz
然后使用 rz 命令即可。

主动工作与跨部门协作

主动工作

前言

前几天接了姬风的电话,问我如何跨团队合作,当时回答的比较具象,今天总结下。
注意:这次要谈的主动工作并不是指你工作积极,认真,接受加班,而是任务的确认和任务状态的更新。
我尝试从处理一个任务的每个过程去总结,这些过程包括:接到任务,任务变更通知,主动推进、积极反馈等

1、回复与确认

当你收到任务之后,需要回复邮件,并抄送给所有人,告诉大家,你已经接受任务了。
如果你的任务排期有点靠后,需要简单的解释下原因。

回复邮件中,至少应该包括两点:
    大约什么时候可以有时间进行这个任务,大约多少时间后会给汇报。

2、意外和变更的及时汇报

计划赶不上变化,学会拥抱变化。

任务一旦发生变化,有些人,别人不问,自己也不讲,结果整个项目的的进行因此出现了问题。
所以当任务状态发生变化时,一定要及时汇报给你的上司,以及相关可能受到影响的其他同事。

3、解决信息不对称

有些项目临时会找一个人讨论,或者临时加入一个项目组进行工作。
已经处于这个工作中的人,对事情的来龙去脉很清楚,但是新人不清楚。需要对新人进行信息的同步。

4、主动推进

相关的负责人,或者哪怕是一个参与者,应该有意识,主动去推进项目的进行,并在力所能及的情况下,主动承担导致延误的一些非自己工作职责的事情。    

(1)不断的询问进度,获得反馈,根据反馈做决策。
(2)不断寻求资源和人力支持,询问领导资源。
(3)针对当前影响进度的问题和困难发起讨论,并需求可解决的方案和路线图
(4)资源和人力资源紧缺的情况下,针对性调整计划或者任务,让项目可以规避一些障碍,继续前行
(5)不断争取人力资源,包括但不限于督促招聘,外包,跨部门借调,乃至兼职等。

5、积极反馈

当你完成一个任务,并且汇报完成,需要进一步观察任务发展的走势,并进行持续报告。

结束语:

一个人的能力终究是有限的,学会处理事情的方法和掌握的技能比较起来,方法比技能更重要。

Spark基础系列(6)-构建Maven项目(scala)

前提:需要在idea上安装scala插件(略)

Scala的代码比Java代码简洁的多,看完这个例子,你或许或者小岳岳一样说道,咿,这么神奇!

1、创建maven项目(略)

2、导入pom依赖文件

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>1.6.0</version>
</dependency>

3、编写wordcount代码

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {

def main(args: Array[String]) {
//1、创建一个sparkContext对象,并设置当前driverApp的名字
new SparkContext(new SparkConf().setAppName("myScalaApp"))
//2、通过SparkContext加载数据文件
.textFile("E:\\words.txt", 1)
//3、对该数据文件所有的block(hadoop hdfs 概念)文件,进行读取并按照指定的代码进行操作
.flatMap(line => line.split(" "))
//4、对步骤3的结果集进行处理,对每个单词加1
.map(word => (word, 1))
//5、对文件中出现的同一个单词进行累加
      //这里输入的两个参数都是value,一个是累加的总value,一个是下一个将要被累加的value
.reduceByKey((x,y) => x+y)
//6、将最后的结果action成为一个list
.collect()
//7、打印list,从元组中获取角标1的值、角标2的值
.foreach(tuple =>println(tuple._1+" "+tuple._2))
  }
}

4、代码编写完毕之后,在运行时,假如参数 -Dspark.master=local

clipboard

5、运行效果如下

“D:\Program Files\Java\jdk1.8.0_73\bin\java” ……  com.intellij.rt.execution.application.AppMain WordCount

Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties

16/03/18 12:48:56 INFO SparkContext: Running Spark version 1.6.0

16/03/18 12:48:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable

16/03/18 12:48:57 INFO SecurityManager: Changing view acls to: maoxiangyi

16/03/18 12:48:57 INFO SecurityManager: Changing modify acls to: maoxiangyi

16/03/18 12:48:57 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(maoxiangyi); users with modify permissions: Set(maoxiangyi)

16/03/18 12:48:58 INFO Utils: Successfully started service ‘sparkDriver’ on port 59490.

16/03/18 12:48:59 INFO Slf4jLogger: Slf4jLogger started

16/03/18 12:48:59 INFO Remoting: Starting remoting

16/03/18 12:48:59 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.56.1:59503]

16/03/18 12:48:59 INFO Utils: Successfully started service ‘sparkDriverActorSystem’ on port 59503.

16/03/18 12:48:59 INFO SparkEnv: Registering MapOutputTracker

16/03/18 12:48:59 INFO SparkEnv: Registering BlockManagerMaster

16/03/18 12:48:59 INFO DiskBlockManager: Created local directory at C:\Users\maoxiangyi\AppData\Local\Temp\blockmgr-56901a95-06c1-45fe-95d5-eea4fb939180

16/03/18 12:48:59 INFO MemoryStore: MemoryStore started with capacity 2.4 GB

16/03/18 12:48:59 INFO SparkEnv: Registering OutputCommitCoordinator

16/03/18 12:48:59 INFO Utils: Successfully started service ‘SparkUI’ on port 4040.

16/03/18 12:48:59 INFO SparkUI: Started SparkUI at http://192.168.56.1:4040

16/03/18 12:48:59 INFO Executor: Starting executor ID driver on host localhost

16/03/18 12:48:59 INFO Utils: Successfully started service ‘org.apache.spark.network.netty.NettyBlockTransferService’ on port 59510.

16/03/18 12:48:59 INFO NettyBlockTransferService: Server created on 59510

16/03/18 12:48:59 INFO BlockManagerMaster: Trying to register BlockManager

16/03/18 12:48:59 INFO BlockManagerMasterEndpoint: Registering block manager localhost:59510 with 2.4 GB RAM, BlockManagerId(driver, localhost, 59510)

16/03/18 12:48:59 INFO BlockManagerMaster: Registered BlockManager

16/03/18 12:49:00 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 107.7 KB, free 107.7 KB)

16/03/18 12:49:00 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 9.8 KB, free 117.5 KB)

16/03/18 12:49:00 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:59510 (size: 9.8 KB, free: 2.4 GB)

16/03/18 12:49:00 INFO SparkContext: Created broadcast 0 from textFile at WordCount.scala:7

16/03/18 12:49:02 WARN : Your hostname, maoxiangyi-PC resolves to a loopback/non-reachable address: fe80:0:0:0:0:5efe:ac10:20%net13, but we couldn’t find any external IP address!

16/03/18 12:49:03 INFO FileInputFormat: Total input paths to process : 1

16/03/18 12:49:03 INFO SparkContext: Starting job: collect at WordCount.scala:11

16/03/18 12:49:03 INFO DAGScheduler: Registering RDD 3 (map at WordCount.scala:9)

16/03/18 12:49:03 INFO DAGScheduler: Got job 0 (collect at WordCount.scala:11) with 1 output partitions

16/03/18 12:49:03 INFO DAGScheduler: Final stage: ResultStage 1 (collect at WordCount.scala:11)

16/03/18 12:49:03 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)

16/03/18 12:49:03 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)

16/03/18 12:49:03 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at map at WordCount.scala:9), which has no missing parents

16/03/18 12:49:03 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.9 KB, free 121.4 KB)

16/03/18 12:49:03 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.2 KB, free 123.6 KB)

16/03/18 12:49:03 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:59510 (size: 2.2 KB, free: 2.4 GB)

16/03/18 12:49:03 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006

16/03/18 12:49:03 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at map at WordCount.scala:9)

16/03/18 12:49:03 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks

16/03/18 12:49:03 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2028 bytes)

16/03/18 12:49:03 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)

16/03/18 12:49:03 INFO HadoopRDD: Input split: file:/E:/words.txt:0+37

16/03/18 12:49:03 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id

16/03/18 12:49:03 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id

16/03/18 12:49:03 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap

16/03/18 12:49:03 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition

16/03/18 12:49:03 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id

16/03/18 12:49:03 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2253 bytes result sent to driver

16/03/18 12:49:03 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 160 ms on localhost (1/1)

16/03/18 12:49:03 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool

16/03/18 12:49:03 INFO DAGScheduler: ShuffleMapStage 0 (map at WordCount.scala:9) finished in 0.175 s

16/03/18 12:49:03 INFO DAGScheduler: looking for newly runnable stages

16/03/18 12:49:03 INFO DAGScheduler: running: Set()

16/03/18 12:49:03 INFO DAGScheduler: waiting: Set(ResultStage 1)

16/03/18 12:49:03 INFO DAGScheduler: failed: Set()

16/03/18 12:49:03 INFO DAGScheduler: Submitting ResultStage 1 (ShuffledRDD[4] at reduceByKey at WordCount.scala:10), which has no missing parents

16/03/18 12:49:03 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.5 KB, free 126.0 KB)

16/03/18 12:49:03 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1520.0 B, free 127.5 KB)

16/03/18 12:49:03 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:59510 (size: 1520.0 B, free: 2.4 GB)

16/03/18 12:49:03 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006

16/03/18 12:49:03 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (ShuffledRDD[4] at reduceByKey at WordCount.scala:10)

16/03/18 12:49:03 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks

16/03/18 12:49:03 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,NODE_LOCAL, 1813 bytes)

16/03/18 12:49:03 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)

16/03/18 12:49:03 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks

16/03/18 12:49:03 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 21 ms

16/03/18 12:49:03 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1390 bytes result sent to driver

16/03/18 12:49:03 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 75 ms on localhost (1/1)

16/03/18 12:49:03 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool

16/03/18 12:49:03 INFO DAGScheduler: ResultStage 1 (collect at WordCount.scala:11) finished in 0.077 s

16/03/18 12:49:03 INFO DAGScheduler: Job 0 finished: collect at WordCount.scala:11, took 0.383501 s

am 3

i 3

tom 1

hanmeimei 1

lilei 1

16/03/18 12:49:03 INFO SparkContext: Invoking stop() from shutdown hook

16/03/18 12:49:03 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040

16/03/18 12:49:03 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!

16/03/18 12:49:03 INFO MemoryStore: MemoryStore cleared

16/03/18 12:49:03 INFO BlockManager: BlockManager stopped

16/03/18 12:49:03 INFO BlockManagerMaster: BlockManagerMaster stopped

16/03/18 12:49:03 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

16/03/18 12:49:03 INFO SparkContext: Successfully stopped SparkContext

16/03/18 12:49:03 INFO ShutdownHookManager: Shutdown hook called

16/03/18 12:49:03 INFO ShutdownHookManager: Deleting directory C:\Users\maoxiangyi\AppData\Local\Temp\spark-cc708ad4-410f-415f-9dc2-06c84144e792

16/03/18 12:49:03 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.

16/03/18 12:49:03 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports

 

Spark基础系列(5)-构建Maven项目(Java)

1、创建maven项目(略)

2、导入pom依赖文件

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>1.6.0</version>
</dependency>

3、编写wordcount代码——注意:我用的jdk1.8,jdk1.8提供了函数式编程,代码最后的forEach(Consumer)就是jdk1.8提供的

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

public class WordCount {

public static void main(String[] args) {
//初始化配置文件
SparkConf sparkConf = new SparkConf().setAppName("javaSparkApp");
//1、创建核心的SparkContext类,SparkContext在Java中的实现是JavaSparkContext
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
//2、通过JavaSparkContext加载数据文件,使用封装好的textFile读取文件,并缓存到内存中。
        //   注意这里的textFile、cache是一个transformation,不进行实际的操作。
JavaRDD textRDD = sparkContext.textFile("E:\\words.txt").cache();
//3、通过RDD.flatMap的方式,读取文档中的数据,一条一条。
textRDD.flatMap(new FlatMapFunction() {
//3.1、对读取的句子进行单词切割,并返回一个迭代器Iterable,Iterable是一个接口,ArrayList是其实现类之一
public Iterable call(Object o) throws Exception {
                String message = (String) o;
String[] strArr = message.split(" ");
List<String> wordList = new ArrayList<String>();
                for (String word : strArr) {
                    wordList.add(word);
}
return wordList;
}
//4、将普通RDD转换成JavaPairRDD
}).mapToPair(new PairFunction() {
public Tuple2 call(Object o) throws Exception {
return new Tuple2<String, Integer>((String) o, 1);
}
//5、对相同Key的value,进行累加
        // 注意:这里输入的两个参数都是value,一个是累加的总value,一个是下一个将要被累加的value
}).reduceByKey(new Function2() {
public Object call(Object v1, Object v2) throws Exception {
return (Integer)v1+(Integer)v2;
}
//6、执行collect方法,将最终的结果封装到List<Tuple>的方式,返回给客户端
        //注意:collect方法是一个action方法,也就是说上面的所有方法都是transformation,只有到这里才会真正开始执行。
}).collect().forEach(new Consumer() {
public void accept(Object o) {
                Tuple2 tuple = (Tuple2) o;
System.out.println(tuple._1() + ": " + tuple._2());
}
        });
sparkContext.stop();
}
}

4、代码编写完毕之后,在运行时,假如参数 -Dspark.master=local

clipboard

5、运行效果如下

“D:\Program Files\Java\jdk1.8.0_73\bin\java”  ……  com.intellij.rt.execution.application.AppMain cn.maoxiangyi.spark.WordCount

Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties

16/03/18 11:50:43 INFO SparkContext: Running Spark version 1.6.0

16/03/18 11:50:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable

16/03/18 11:50:44 INFO SecurityManager: Changing view acls to: maoxiangyi

16/03/18 11:50:44 INFO SecurityManager: Changing modify acls to: maoxiangyi

16/03/18 11:50:44 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(maoxiangyi); users with modify permissions: Set(maoxiangyi)

16/03/18 11:50:44 INFO Utils: Successfully started service ‘sparkDriver’ on port 58766.

16/03/18 11:50:45 INFO Slf4jLogger: Slf4jLogger started

16/03/18 11:50:45 INFO Remoting: Starting remoting

16/03/18 11:50:45 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.56.1:58779]

16/03/18 11:50:45 INFO Utils: Successfully started service ‘sparkDriverActorSystem’ on port 58779.

16/03/18 11:50:45 INFO SparkEnv: Registering MapOutputTracker

16/03/18 11:50:45 INFO SparkEnv: Registering BlockManagerMaster

16/03/18 11:50:45 INFO DiskBlockManager: Created local directory at C:\Users\maoxiangyi\AppData\Local\Temp\blockmgr-4ea15741-c166-4376-94bf-7044fb58e6c8

16/03/18 11:50:45 INFO MemoryStore: MemoryStore started with capacity 2.4 GB

16/03/18 11:50:45 INFO SparkEnv: Registering OutputCommitCoordinator

16/03/18 11:50:45 INFO Utils: Successfully started service ‘SparkUI’ on port 4040.

16/03/18 11:50:45 INFO SparkUI: Started SparkUI at http://192.168.56.1:4040

16/03/18 11:50:45 INFO Executor: Starting executor ID driver on host localhost

16/03/18 11:50:45 INFO Utils: Successfully started service ‘org.apache.spark.network.netty.NettyBlockTransferService’ on port 58786.

16/03/18 11:50:45 INFO NettyBlockTransferService: Server created on 58786

16/03/18 11:50:45 INFO BlockManagerMaster: Trying to register BlockManager

16/03/18 11:50:45 INFO BlockManagerMasterEndpoint: Registering block manager localhost:58786 with 2.4 GB RAM, BlockManagerId(driver, localhost, 58786)

16/03/18 11:50:45 INFO BlockManagerMaster: Registered BlockManager

16/03/18 11:50:46 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 107.7 KB, free 107.7 KB)

16/03/18 11:50:46 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 9.8 KB, free 117.5 KB)

16/03/18 11:50:46 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:58786 (size: 9.8 KB, free: 2.4 GB)

16/03/18 11:50:46 INFO SparkContext: Created broadcast 0 from textFile at WordCount.java:24

16/03/18 11:50:48 WARN : Your hostname, maoxiangyi-PC resolves to a loopback/non-reachable address: fe80:0:0:0:0:5efe:ac10:20%net13, but we couldn’t find any external IP address!

16/03/18 11:50:49 INFO FileInputFormat: Total input paths to process : 1

16/03/18 11:50:49 INFO SparkContext: Starting job: collect at WordCount.java:50

16/03/18 11:50:49 INFO DAGScheduler: Registering RDD 3 (mapToPair at WordCount.java:38)

16/03/18 11:50:49 INFO DAGScheduler: Got job 0 (collect at WordCount.java:50) with 1 output partitions

16/03/18 11:50:49 INFO DAGScheduler: Final stage: ResultStage 1 (collect at WordCount.java:50)

16/03/18 11:50:49 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)

16/03/18 11:50:49 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)

16/03/18 11:50:49 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at mapToPair at WordCount.java:38), which has no missing parents

16/03/18 11:50:49 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.6 KB, free 122.1 KB)

16/03/18 11:50:49 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.6 KB, free 124.7 KB)

16/03/18 11:50:49 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:58786 (size: 2.6 KB, free: 2.4 GB)

16/03/18 11:50:49 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006

16/03/18 11:50:49 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at mapToPair at WordCount.java:38)

16/03/18 11:50:49 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks

16/03/18 11:50:49 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2028 bytes)

16/03/18 11:50:49 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)

16/03/18 11:50:49 INFO CacheManager: Partition rdd_1_0 not found, computing it

16/03/18 11:50:49 INFO HadoopRDD: Input split: file:/E:/words.txt:0+37

16/03/18 11:50:49 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id

16/03/18 11:50:49 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id

16/03/18 11:50:49 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap

16/03/18 11:50:49 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition

16/03/18 11:50:49 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id

16/03/18 11:50:49 INFO MemoryStore: Block rdd_1_0 stored as values in memory (estimated size 224.0 B, free 124.9 KB)

16/03/18 11:50:49 INFO BlockManagerInfo: Added rdd_1_0 in memory on localhost:58786 (size: 224.0 B, free: 2.4 GB)

16/03/18 11:50:49 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2752 bytes result sent to driver

16/03/18 11:50:49 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 162 ms on localhost (1/1)

16/03/18 11:50:49 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool

16/03/18 11:50:49 INFO DAGScheduler: ShuffleMapStage 0 (mapToPair at WordCount.java:38) finished in 0.180 s

16/03/18 11:50:49 INFO DAGScheduler: looking for newly runnable stages

16/03/18 11:50:49 INFO DAGScheduler: running: Set()

16/03/18 11:50:49 INFO DAGScheduler: waiting: Set(ResultStage 1)

16/03/18 11:50:49 INFO DAGScheduler: failed: Set()

16/03/18 11:50:49 INFO DAGScheduler: Submitting ResultStage 1 (ShuffledRDD[4] at reduceByKey at WordCount.java:44), which has no missing parents

16/03/18 11:50:49 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.8 KB, free 127.7 KB)

16/03/18 11:50:49 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1683.0 B, free 129.3 KB)

16/03/18 11:50:49 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:58786 (size: 1683.0 B, free: 2.4 GB)

16/03/18 11:50:49 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006

16/03/18 11:50:49 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (ShuffledRDD[4] at reduceByKey at WordCount.java:44)

16/03/18 11:50:49 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks

16/03/18 11:50:49 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,NODE_LOCAL, 1813 bytes)

16/03/18 11:50:49 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)

16/03/18 11:50:49 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks

16/03/18 11:50:49 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 4 ms

16/03/18 11:50:49 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1390 bytes result sent to driver

16/03/18 11:50:49 INFO DAGScheduler: ResultStage 1 (collect at WordCount.java:50) finished in 0.039 s

16/03/18 11:50:49 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 39 ms on localhost (1/1)

16/03/18 11:50:49 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool

16/03/18 11:50:49 INFO DAGScheduler: Job 0 finished: collect at WordCount.java:50, took 0.328826 s

am: 3

i: 3

tom: 1

hanmeimei: 1

lilei: 1

16/03/18 11:50:49 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040

16/03/18 11:50:49 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!

16/03/18 11:50:49 INFO MemoryStore: MemoryStore cleared

16/03/18 11:50:49 INFO BlockManager: BlockManager stopped

16/03/18 11:50:49 INFO BlockManagerMaster: BlockManagerMaster stopped

16/03/18 11:50:49 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

16/03/18 11:50:49 INFO SparkContext: Successfully stopped SparkContext

16/03/18 11:50:49 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.

16/03/18 11:50:49 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.

16/03/18 11:50:49 INFO ShutdownHookManager: Shutdown hook called

16/03/18 11:50:49 INFO ShutdownHookManager: Deleting directory C:\Users\maoxiangyi\AppData\Local\Temp\spark-29734c78-b621-4133-a166-f7657f84930f

16/03/18 11:50:49 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.

Spark基础系列(4)-使用工具查看SparkSubmit 启动时的调用顺序

1、在命令行输入以下命令:

    /export/servers/jdk/bin/java -cp /export/servers/spark/conf/:/export/servers/spark/lib/spark-assembly-1.6.1-hadoop2.6.0.jar:/export/servers/spark/lib/datanucleus-api-jdo-3.2.6.jar:/export/servers/spark/lib/datanucleus-core-3.2.10.jar:/export/servers/spark/lib/datanucleus-rdbms-3.2.9.jar -Dscala.usejavacp=true -Xms1g -Xmx1g –Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10207 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false org.apache.spark.deploy.SparkSubmit –class org.apache.spark.repl.Main –name Spark shell spark-shell

2、在windows平台下,启动jvisualvm.exe工具

    该工具在你的jdk安装目录的bin目录下。比如:D:\Program Files\Java\jdk1.8.0_73\bin

3、双击启动jvisualvm.exe工具之后,依次点击 文件—>添加jmi链接

clipboard

4、在 添加JMX连接 填写你spark-shell脚本运行的服务器

clipboard[1]

5、填写完毕之后,直接点击连接,并快速切换到线程的选项卡,看看下图中的箭头执行的方向。能够明显感觉到spark相关线程启动的顺序。

clipboard[2]

6、看完整体的启动顺序后,点击main方法,dump线程,可以观察到SparkSubmit 启动时,初始化的几个方法。

clipboard[3]

7、下图中左下角的是dump下的main线程的信息,右边是启动顺序,依次是。

org.apache.spark.deploy.SparkSubmit、反射org.apache.spark.repl.Main、反射org.apache.spark.repl.SparkLoop.process

clipboard[4]

注:jvisualvm.exe工具如果用的好,对了解各种java框架的运行步骤帮助多多呀。

Spark基础系列(3)-分析spark启动脚本spark-shell

1、我们通过控制台输入:spark-shell 

2、spark-shell脚本调用了bin目录的spark-submit脚本

clipboard

3、spark-submit脚本调用了spark-class脚本,并指定一个参数:org.apache.spark.deploy.SparkSubmit

clipboard[1]

4、spark-class脚本经过一系列的处理,执行了 exec  “${CMD[@]}”

clipboard[2]

5、通过在spark-class脚本中增添一个echo语句,打印了”${CMD[@]}”的内容,如下:

/export/servers/jdk/bin/java -cp /export/servers/spark/conf/:/export/servers/spark/lib/spark-assembly-1.6.1-hadoop2.6.0.jar:/export/servers/spark/lib/datanucleus-api-jdo-3.2.6.jar:/export/servers/spark/lib/datanucleus-core-3.2.10.jar:/export/servers/spark/lib/datanucleus-rdbms-3.2.9.jar -Dscala.usejavacp=true -Xms1g -Xmx1g org.apache.spark.deploy.SparkSubmit –class org.apache.spark.repl.Main –name Spark shell spark-shell

   补充: java -cp 是什么意思?

-cp 和 -classpath 一样,是指定类运行所依赖其他类的路径,通常是类库,jar包之类,需要全路径到jar包。

     window上分号“;”分隔,

     linux上是分号“:”分隔。

    不支持通配符,需要列出所有jar包,用一点“.”代表当前路径。

6、通过步骤5的分析,发现在启动spark-shell时,实际上启动了一个JVM,类似结果如下:

/export/servers/jdk/bin/java  org.apache.spark.deploy.SparkSubmit

Spark基础系列(2)-Spark HelloWord

1、准备数据文件

vi /root/content.log

    输入以下内容:

i am hanmeimei

    i am lilei

    i am tom

2、启动spark脚本

    在控制台输入:spark-shell

clipboard[14]

3、加载数据

val lines = sc.textFile(“/root/content.log”,2)

clipboard[15]

4、对单词进行分割,使用空格

val words = lines.flatMap(line => line.split(” “))

clipboard[16]

5、切割之后,对单词准备计数,将单词与数字1 组合新的元组

val ones =words.map(w =>(w,1))

clipboard[17]

6、开始计算,对相同的单词进行归并,并将单个单词出现的所有频率累加,如同hadoop的shuffle。

    val counts = ones.reduceByKey(_+_)

clipboard[18]

7、打印出结果数据

counts.foreach(println)

clipboard[19]

Spark基础系列(1)-Spark环境安装

本章内容

  • 安装JDK
  • 安装SCALA
  • 安装Spark

第一部分:安装JDK


1、查看当前系统上是否安装了JDK

    进入一个新的系统,一般来讲是没有安装过JDK的。但是有些linux的版本会默认安装openJDK。open JDK实在oracle公司开源的代码上进行同步开发并发布的版本。

    如果要查看当前系统是否已经安装了JDK,可以使用命令:sudo update-alternatives –config java

clipboard

2、下载JDK

     由于当前系统中并没有我们需要的oracle JDK,我们需要自己安装一个。

     先在oracle的网站上寻找JDK的下载地址,然后使用wget命令进行下载。或者,你可以将数据下载到电脑上,然后上传到linux上。

wget http://download.oracle.com/otn-pub/java/jdk/8u60-b27/jdk-8u60-linux-x64.tar.gz?AuthParam=1441700527_0f80e68acf711ce8da8af237439f406a

clipboard[1]

3、解压安装包

       mv jdk-8u60-linux*   jdk-8u60-linux-x64.tar.gz

       tar -zxvf jdk-8u60-linux-x64.tar.gz -C ../install/

4、配置JDK部署信息到环境变量

        #set java env

        export JAVA_HOME=/export/servers/jdk

        export JRE_HOME=${JAVA_HOME}/jre

        export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib

        export PATH=${JAVA_HOME}/bin:$PATH

5、让配置信息生效

        source /etc/profile

6、将oracle JDK 配置成默认的JDK.

update-alternatives –install /usr/bin/java java /export/servers/jdk/bin/java 300

update-alternatives –install /usr/bin/javac javac /export/servers/jdk//bin/javac 300

7、产看当前机器的JDK版本

clipboard[2]


第二部分:安装scala


1、下载scala

http://www.scala-lang.org/download/2.11.8.html   拖动到页面最下面 ,找到linux版本

wget http://downloads.lightbend.com/scala/2.11.8/scala-2.11.8.tgz

2、解压scala并安装

mv scala-2.11.8.tgz /export/software/

tar -xzvf scala-2.11.8.tgz -C ../servers/

cd ../servers/

ln -s scala-2.11.8 scala

3、配置环境变量

vi /etc/profile

    输入以下内容:

    #set scala env

    export SCALA_HOME=/export/servers/scala

    export PATH=${SCALA_HOME}/bin:$PATH

    使配置生效:

source /etc/profile

4、在每台机器上验证scala是否安装成功

    输入命令:scala

clipboard[3]

第三部分:安装Spark

1、下载spark安装包

http://spark.apache.org/downloads.html  选择对应的hadoop版本号

http://apache.opencas.org/spark/spark-1.6.1/spark-1.6.1-bin-hadoop2.6.tgz  点击镜像地址 2、解压spark并安装

mv spark-1.6.1-bin-hadoop2.6.tgz /export/software/

tar -zxvf spark-1.6.1-bin-hadoop2.6.tgz -C /export/servers/

cd /export/servers/

ln -s spark-1.6.1-bin-hadoop2.6 spark

3、配置环境变量

vi /etc/profile

    输入以下内容:

#set scala env    export SPARK_HOME=/export/servers/spark

    export PATH=${SPARK_HOME}/bin:$PATH

    使配置生效:

source /etc/profile

4、修改配置文件

cd $SPARK_HOME/conf

cp spark-env.sh.template spark-env.sh

    vi spark-env.sh

    输入以下内容:

export SPARK_MASTER_IP=127.0.0.1

    export SPARK_LOCAL_IP=127.0.0.1

5、启动spark

   在控制台输入命令: spark-shell 

clipboard[4]