前面用了五篇文章來介紹Hadoop的相關模塊,理論學完還得操作一把才能加深理解。這一篇我會花相當長的時間從環境搭建開始,到怎么在使用Hadoop,逐步介紹Hadoop的使用。
本篇分這么幾段內容:
- 規划部署節點
- 節點免密和網絡配置
- zookeeper分布式集群搭建
- Hadoop分布式集群搭建
- IDEA遠程提交MapReduce任務到分布式集群
規划部署節點
HDFS高可用至少有兩個NameNode(NN),副本存三份有 三個DataNode(DN)。Yarn高可用至少有兩個Resource Manager(RM),計算向存儲移動需要在每個DataNode上部署NodeManager(NM)。Zookeeper(ZK)三節點選主。JournalNode(JN)三節點才有過半成功。
基於以上考慮,規划部署節點如下:
主機 | IP | NN | RM | ZKFC | DN | NM | JN | ZK |
server01 | 192.168.0.111 | • | • | • | ||||
server02 | 192.168.0.112 | • | • | • | ||||
server03 | 192.168.0.113 | • | • | • | • | |||
server04 | 192.168.0.114 | • | • | • | • | |||
server05 | 192.168.0.115 | • | • | • | • |
注意:1.ZKFC與NameNode必須同節點才能選主。
2.DataNode與NodeManager必須同節點,體現計算向數據移動。
3.新建hadoop用戶和組,以上軟件全部用hadoop用戶啟動。
節點免密和網絡配置
先要關閉五台機的防火牆,這一點非常重要。否則后面啟動zookeeper后,zkServer.sh status不能正常查看zk狀態,三個zk之間無法通訊。第二個Namenode也無法同步第一個Namenode的數據。
[root@server04 home]# systemctl stop firewalld [root@server04 home]# systemctl disable firewalld Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service. Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service.
按照規划修改/etc/hostname文件中的主機名。
在/etc/hosts文件中增加五台機的主機名解析。
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 192.168.0.111 server01 192.168.0.112 server02 192.168.0.113 server03 192.168.0.114 server04 192.168.0.115 server05
節點免密
由於server01、server02上可能會啟動其他節點的腳本,所以server01和server02要對其他節點免密。
以server01為例,依次輸入以下命令,將server01的公鑰分發給其他節點,達到免密登錄的目的。
注意一點,ssh-copy-id生成的authorized_keys權限是600,如果權限是644就沒辦法免密登錄,這個文件權限要求很苛刻。
[hadoop@server01 ~]$ ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa [hadoop@server01 ~]$ ssh-copy-id -i ~/.ssh/id_rsa.pub hadoop@server01 [hadoop@server01 ~]$ ssh-copy-id -i ~/.ssh/id_rsa.pub hadoop@server02 [hadoop@server01 ~]$ ssh-copy-id -i ~/.ssh/id_rsa.pub hadoop@server03 [hadoop@server01 ~]$ ssh-copy-id -i ~/.ssh/id_rsa.pub hadoop@server04 [hadoop@server01 ~]$ ssh-copy-id -i ~/.ssh/id_rsa.pub hadoop@server05
zookeeper分布式集群搭建
下載zookeeper
從官網上下載apache-zookeeper-3.5.8-bin.tar.gz包,注意不帶bin的包里面是源碼,不能啟動QuorumPeerMain。
安裝zookeeper
將apache-zookeeper-3.5.8-bin.tar.gz包解壓到/usr目錄下,把用戶和組設置為hadoop。
在環境變量里增加
ZOOKEEPER_HOME=/usr/apache-zookeeper-3.5.8-bin
PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin
將/usr/apache-zookeeper-3.5.8-bin/conf/zoo_sample.cfg復制一份到相同目錄,命令為zoo.cfg,這是啟動zookeeper時要讀取的配置文件。
以server03為例,修改zoo.cfg的內容。
[hadoop@server03 home]$ cat /usr/apache-zookeeper-3.5.8-bin/conf/zoo.cfg # The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. # 這里存放zookeeper的myid文件 dataDir=/opt/zookeeper/data # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 # server.x的數值與myid中的一致,后面接是zookeeper的三個節點。 # 2888端口是集群節點通訊使用 # 3888端口是選舉leader使用 server.3=server03:2888:3888 server.4=server04:2888:3888 server.5=server05:2888:3888
在/opt/zookeeper/data目錄下創建myid文件
[hadoop@server03 home]$ echo 3 > /opt/zookeeper/data/myid
至此,zookeeper搭建完畢,server04和server05的步驟一致。
在三台機器分別啟動zookeeper。
[hadoop@server03 home]$ zkServer.sh start [hadoop@server03 home]$ jps 3762 Jps 1594 QuorumPeerMain [hadoop@server03 home]$ zkServer.sh status /bin/java ZooKeeper JMX enabled by default Using config: /usr/apache-zookeeper-3.5.8-bin/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: leader
server03被選為主節點了。
Hadoop分布式集群搭建
Hadoop基於Java開發,必須先安裝jdk。本次實驗使用的版本是jdk-11.0.8,安裝完成后需要在環境變量里加入JAVA_HOME。
下載Hadoop
從官網下載hadoop-3.3.0.tar.gz,解壓到/usr目錄下,並將用戶和組修改為hadoop。
配置Hadoop
官方網站上有詳細的說明 Apache Hadoop 3.3.0,把單節點的配置和高可用配置放在相應的文件下就行,這里我再梳理一遍。在此之前,需要修改/usr/hadoop-3.3.0/etc/hadoop/hadoop-env.sh,增加export JAVA_HOME=/usr/java/jdk-11.0.8,讓hadoop運行命令時能調用jdk。
在環境變量里增加
HADOOP_HOME=/usr/hadoop-3.3.0
PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
1. /usr/hadoop-3.3.0/etc/hadoop/core-site.xml
<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://mycluster</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/opt/hadoop/tmp</value> </property> <property> <name>ha.zookeeper.quorum</name> <value>server03:2181,server04:2181,server05:2181</value> </property> </configuration>
2./usr/hadoop-3.3.0/etc/hadoop/hdfs-site.xml
<configuration> <property> <name>dfs.replication</name> <value>3</value> </property> <property> <name>dfs.nameservices</name> <value>mycluster</value> </property> <property> <name>dfs.ha.namenodes.mycluster</name> <value>nn1,nn2</value> </property> <property> <name>dfs.namenode.rpc-address.mycluster.nn1</name> <value>server01:8020</value> </property> <property> <name>dfs.namenode.rpc-address.mycluster.nn2</name> <value>server02:8020</value> </property> <property> <name>dfs.namenode.http-address.mycluster.nn1</name> <value>server01:9870</value> </property> <property> <name>dfs.namenode.http-address.mycluster.nn2</name> <value>server02:9870</value> </property> <property> <name>dfs.namenode.shared.edits.dir</name> <value>qjournal://server03:8485;server04:8485;server05:8485/mycluster</value> </property> <property> <name>dfs.client.failover.proxy.provider.mycluster</name> <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value> </property> <property> <name>dfs.ha.fencing.methods</name> <value>sshfence</value> </property> <property> <name>dfs.ha.fencing.ssh.private-key-files</name> <value>/home/hadoop/.ssh/id_rsa</value> </property> <property> <name>dfs.ha.fencing.ssh.connect-timeout</name> <value>30000</value> </property> <property> <name>dfs.journalnode.edits.dir</name> <value>/opt/hadoop/journaldata</value> </property> <property> <name>dfs.ha.nn.not-become-active-in-safemode</name> <value>true</value> </property> <property> <name>dfs.ha.automatic-failover.enabled</name> <value>true</value> </property> </configuration>
3./usr/hadoop-3.3.0/etc/hadoop/mapred-site.xml
<configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> <property> <name>mapreduce.application.classpath</name> <value>$HADOOP_HOME/share/hadoop/mapreduce/*:$HADOOP_HOME/share/hadoop/mapreduce/lib/*</value> </property> <property> <name>yarn.app.mapreduce.am.env</name> <value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value> </property> <property> <name>mapreduce.map.env</name> <value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value> </property> <property> <name>mapreduce.reduce.env</name> <value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value> </property> </configuration>
4./usr/hadoop-3.3.0/etc/hadoop/yarn-site.xml
<configuration> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.application.classpath</name> <value>這里配置為server01上執行hadoop classpath返回的內容</value> </property> <property> <name>yarn.nodemanager.env-whitelist</name> <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value> </property> <property> <name>yarn.resourcemanager.ha.enabled</name> <value>true</value> </property> <property> <name>yarn.resourcemanager.cluster-id</name> <value>cluster1</value> </property> <property> <name>yarn.resourcemanager.ha.rm-ids</name> <value>rm1,rm2</value> </property> <property> <name>yarn.resourcemanager.hostname.rm1</name> <value>server01</value> </property> <property> <name>yarn.resourcemanager.hostname.rm2</name> <value>server02</value> </property> <property> <name>yarn.resourcemanager.webapp.address.rm1</name> <value>server01:8088</value> </property> <property> <name>yarn.resourcemanager.webapp.address.rm2</name> <value>server02:8088</value> </property> <property> <name>hadoop.zk.address</name> <value>server03:2181,server04:2181,server05:2181</value> </property> </configuration>
5./usr/hadoop-3.3.0/etc/hadoop/works
server03
server04
server05
啟動zookeeper
在serverr03、server04、server05三台機器上使用hadoop用戶執行zkServer.sh start啟動zookeeper。三台都啟動之后,在每台機執行zkServer.sh status可以查看本節點是主節點(leader)還是從節點(follower)。
[hadoop@server03 ~]$ zkServer.sh start /bin/java ZooKeeper JMX enabled by default Using config: /usr/apache-zookeeper-3.5.8-bin/bin/../conf/zoo.cfg Starting zookeeper ... STARTED [hadoop@server03 ~]$ jps 3383 QuorumPeerMain 3422 Jps [hadoop@server03 ~]$ zkServer.sh status /bin/java ZooKeeper JMX enabled by default Using config: /usr/apache-zookeeper-3.5.8-bin/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: follower
啟動JournalNode
在啟動journalnode之前,我們在hdfs-site.xml里做了如下配置
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/opt/hadoop/journaldata</value>
</property>
我們先用root用戶創建這個路徑,再把用戶和組修改成hadoop
[root@server04 ~]# cd /opt [root@server04 opt]# ll 總用量 0 drwxr-xr-x. 3 hadoop hadoop 18 8月 8 23:37 zookeeper [root@server04 opt]# mkdir -p hadoop/journaldata [root@server04 opt]# chown -R hadoop:hadoop hadoop/ [root@server04 opt]# ll 總用量 0 drwxr-xr-x. 3 hadoop hadoop 25 8月 11 22:42 hadoop drwxr-xr-x. 3 hadoop hadoop 18 8月 8 23:37 zookeeper
在server03、server04、server05三台機執行hadoop-daemon.sh start journalnode命令,啟動journalnode
[hadoop@server03 sbin]$ hadoop-daemon.sh start journalnode WARNING: Use of this script to start HDFS daemons is deprecated. WARNING: Attempting to execute replacement "hdfs --daemon start" instead. [hadoop@server03 sbin]$ jps 3383 QuorumPeerMain 4439 JournalNode 4476 Jps
格式化HDFS
我們在core-site.xml里有這么一段配置
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/hadoop/tmp</value>
</property>
先用root創建這個目錄,然后修改用戶和組為hadoop
[root@server01 ~]# cd /opt [root@server01 opt]# ll 總用量 0 [root@server01 opt]# mkdir -p hadoop/tmp [root@server01 opt]# chown -R hadoop:hadoop hadoop/ [root@server01 opt]# ll 總用量 0 drwxr-xr-x. 3 hadoop hadoop 17 8月 11 22:54 hadoop
在server01上執行hadoop namenode -format,對HDFS格式化。
啟動Hadoop
第一次啟動,在server01上執行hadoop-daemon.sh start namenode。
[hadoop@server01 tmp]$ hadoop-daemon.sh start namenode WARNING: Use of this script to start HDFS daemons is deprecated. WARNING: Attempting to execute replacement "hdfs --daemon start" instead. [hadoop@server01 tmp]$ jps 24664 NameNode 24700 Jps
在server02上執行hadoop namenode -bootstrapStandby同步server01的信息。
在server01上執行hdfs zkfc -formatZK格式化zkfc。
以上操作完成后,在server01上執行stop-all.sh,把相關的服務全部停掉。
在server01上執行start-all.sh,啟動集群。
[hadoop@server01 hadoop]$ start-all.sh WARNING: Attempting to start all Apache Hadoop daemons as hadoop in 10 seconds. WARNING: This is not a recommended production deployment configuration. WARNING: Use CTRL-C to abort. Starting namenodes on [server01 server02] Starting datanodes Starting journal nodes [server04 server03 server05] Starting ZK Failover Controllers on NN hosts [server01 server02] Starting resourcemanagers on [ server01 server02] Starting nodemanagers [hadoop@server01 hadoop]$ jps 6323 DFSZKFailoverController 5977 NameNode 6668 ResourceManager 6797 Jps
[hadoop@server02 ~]$ jps 6736 ResourceManager 6600 DFSZKFailoverController 6492 NameNode 7007 Jps
[hadoop@server03 current]$ jps 27705 NodeManager 27978 Jps 5531 QuorumPeerMain 27580 JournalNode 27470 DataNode
[hadoop@server04 tmp]$ jps 5104 QuorumPeerMain 26819 NodeManager 26694 JournalNode 27094 Jps 26584 DataNode
[hadoop@server05 tmp]$ jps 4663 QuorumPeerMain 26394 NodeManager 26268 JournalNode 26684 Jps 26159 DataNode
看到這些結果,我忍不住給自己點個贊,讓開發人員來搭環境真不容易啊。搭建環境的過程遇到了很多坑,查找資料一一解決,最后成功,這種感覺挺棒的。
IDEA遠程提交MapReduce任務到分布式集群
環境搭好了就該實戰一把了。
出一個題目,給出一些日期的氣溫,從中找出每個月溫度最高的兩天所對應的氣溫。注意:一天的氣溫可能會記錄多次,但是結果只能輸出最高的一條。
氣溫文件 1949-10-01 34c 1949-10-01 38c 1949-10-02 36c 1950-01-01 32c 1950-10-01 37c 1951-12-01 23c 1950-10-02 41c 1950-10-03 27c 1951-07-01 45c 1951-07-02 46c 1951-07-03 47c
結題思路:每一行記錄可以全部作為Map階段輸出的Key值,按照Key值中的年、月分組,再按照年、月、溫度排序,最后去掉一天有多條的記錄,輸出結果。
HadoopClient.java
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.net.URI; public class HadoopClient { public static void main(String[] args) throws Exception { //1,conf System.setProperty("HADOOP_USER_NAME", "hadoop"); Configuration conf = new Configuration(true); conf.set("mapreduce.app-submission.cross-platform", "true"); conf.set("mapreduce.job.jar","D:\\IDEAProject\\hadoopmr\\target\\hadoopmr-1.0-SNAPSHOT.jar"); //2,job Job job = Job.getInstance(conf); job.setJarByClass(HadoopClient.class); job.setJobName("FirstMR"); //3,輸入輸出路徑 Path input = new Path("/tq/input/data.txt"); FileInputFormat.addInputPath(job, input); Path output = new Path(URI.create("/tq/output/")); // if(output.getFileSystem(conf).exists(output)){ output.getFileSystem(conf).delete(output, true); } FileOutputFormat.setOutputPath(job, output ); //4,map job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(MyKey.class); job.setMapOutputValueClass(IntWritable.class); job.setPartitionerClass(MyPartitioner.class); job.setSortComparatorClass(MySortComparator.class); //5,reduce // 分組比較器 job.setGroupingComparatorClass(MyGroupingComparator.class); job.setReducerClass(MyReducer.class); job.setNumReduceTasks(1); job.setCombinerKeyGroupingComparatorClass(MyGroupingComparator.class); //7,submit job.waitForCompletion(true); } }
MyKey.java
import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class MyKey implements WritableComparable<MyKey> { private int year; private int month; private int day; private int temperature; public int getYear() { return year; } public void setYear(int year) { this.year = year; } public int getMonth() { return month; } public void setMonth(int month) { this.month = month; } public int getDay() { return day; } public void setDay(int day) { this.day = day; } public int getTemperature() { return temperature; } public void setTemperature(int temperature) { this.temperature = temperature; } @Override public void write(DataOutput out) throws IOException { out.writeInt(year); out.writeInt(month); out.writeInt(day); out.writeInt(temperature); } @Override public void readFields(DataInput in) throws IOException { this.year=in.readInt(); this.month=in.readInt(); this.day=in.readInt(); this.temperature=in.readInt(); } @Override public int compareTo(MyKey that) { // 約定俗成:日期正序 int c1=Integer.compare(this.getYear(), that.getYear()); if(c1==0) { int c2 = Integer.compare(this.getMonth(), that.getMonth()); if(c2==0) { // 比完日期,就沒事了 return Integer.compare(this.getDay(), that.getDay()); } return c2; } return c1; } }
MyMapper.java
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.StringUtils; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; public class MyMapper extends Mapper <LongWritable, Text, MyKey, IntWritable>{ // 放外面,不用每次都創建! MyKey mkey = new MyKey(); IntWritable mval = new IntWritable(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { try { // value: 1949-10-01 14:21:02 34c >> Mykey String[] strs = StringUtils.split(value.toString(), ' '); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); Date date = sdf.parse(strs[0]); Calendar cal = Calendar.getInstance(); cal.setTime(date); mkey.setYear(cal.get(Calendar.YEAR)); mkey.setMonth(cal.get(Calendar.MONTH)+1); mkey.setDay(cal.get(Calendar.DAY_OF_MONTH)); int temperature = Integer.parseInt(strs[1].substring(0, strs[1].length()-1)); mkey.setTemperature(temperature); mval.set(temperature); context.write(mkey, mval); } catch (ParseException e) { e.printStackTrace(); } } }
MyPartitioner.java
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; public class MyPartitioner extends Partitioner<MyKey, IntWritable>{ @Override public int getPartition(MyKey mykey, IntWritable intWritable, int i) { return mykey.getYear() % i; } }
MySortComparator.java
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class MySortComparator extends WritableComparator { public MySortComparator() { super(MyKey.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { MyKey t1 = (MyKey)a; MyKey t2 = (MyKey)b; int c1=Integer.compare(t1.getYear(), t2.getYear()); if(c1==0){ int c2=Integer.compare(t1.getMonth(), t2.getMonth()); if(c2==0){ // 從大到小,倒序 return -Integer.compare(t1.getTemperature(), t2.getTemperature()); } return c2; } return c1; } }
MyGroupingComparator.java
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class MyGroupingComparator extends WritableComparator { public MyGroupingComparator() { super(MyKey.class,true); } public int compare(WritableComparable a, WritableComparable b) { MyKey t1 = (MyKey)a; MyKey t2 = (MyKey)b; int c1=Integer.compare(t1.getYear(), t2.getYear()); if(c1==0){ return Integer.compare(t1.getMonth(), t2.getMonth()); } return c1; } }
MyReducer.java
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class MyReducer extends Reducer<MyKey, IntWritable, Text, IntWritable> { Text rkey = new Text(); IntWritable rval = new IntWritable(); @Override protected void reduce(MyKey key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int flg = 0; int day = 0; // 1970 01 20 34 34 // 1970 01 12 28 28 for (IntWritable v : values) { // 根本就不用v,key跟着變動的 if (flg == 0) { // 1970-01-20:34 rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay()); rval.set(key.getTemperature()); context.write(rkey,rval ); day = key.getDay(); flg++; } // 將同一天,多條記錄排除 if(flg!=0 && day != key.getDay()){ rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay()); rval.set(key.getTemperature()); context.write(rkey,rval); break; } } } }
注意,要把core-site.xml、hdfs-stie.xml、mapred-sit.xml、yarn-site.xml這4個文件放到IDEA工程的src/main/resources目錄下,resources目錄設置為Resources Root。
pom里面增加配置
<resources> <resource> <directory>src/main/resources</directory> <includes> <include>**/*.properties</include> <include>**/*.xml</include> </includes> <filtering>false</filtering> </resource> <resource> <directory>src/main/java</directory> <includes> <include>**/*.properties</include> <include>**/*.xml</include> </includes> <filtering>false</filtering> </resource> </resources>
以上配置完畢后,執行客戶端主程序,用瀏覽器打開server01:8088查看Applications里的執行結果。執行成功之后,到server01上用命令行查看結果文件。
[hadoop@server01 ~]$ hdfs dfs -cat /tq/output/part-r-00000
1949-10-1 38
1949-10-2 36
1950-1-1 32
1950-10-2 41
1950-10-1 37
1951-7-3 47
1951-7-2 46
1951-12-1 23
結果無誤。回過頭來再慢慢看看每一個步驟的代碼邏輯是怎么寫的,多練習就熟悉了。