ConcurrentHashMap源碼解析-Java7


目錄

一.ConcurrentHashMap的模型圖

二.源碼分析-類定義

  2.1 極簡ConcurrentHashMap定義

  2.2 Segment內部類

  2.3 HashEntry內部類

  2.4 ConcurrentHashMap的重要常量

三.常用接口源碼分析

  3.1 ConcurrentHashMap構造方法

  3.2 map.put操作

  3.3 創建新segment

  3.4 segment.put操作

  3.5 segment.rehash擴容

  3.6 map.get操作

  3.7 map.remove操作

  3.8 map.size操作

 

  原文地址:https://www.cnblogs.com/-beyond/p/13157083.html

一.ConcurrentHashMap的模型圖

  本文所有的介紹都是針對Java7而言!!!!!

  下面是ConcurrentHashMap的結構圖,ConcurrentHashMap有一個segments數組,每個segment中又有一個table數組,該數組的每個元素時HashEntry類型。

   

  可以簡單的理解為ConcurrentHashMap是多個HashMap組成,每一個HashMap是一個segment,就如傳說中一樣,ConcurrentHashMap使用分段鎖的方式,這個“段”就是segment。

  ConcurrentHashMap之所以能夠保證並發安全,是因為支持對不同segment的並發修改操作,比如兩個線程同時修改ConcurrentHashMap,一個線程修改第一個segment的數據,另一個線程修改第二個segment的數據,兩個線程可以並發修改,不會出現並發問題;但是多個線程同一個segment進行並發修改,則需要先獲取該segment的鎖后再修改,修改完后釋放鎖,然后其他要修改的線程再進行修改。

  那么,ConcurrentHashMap可以支持多少並發量(寫)呢?這個也就是問,ConcurrentHashMap最多能支持多少線程並發修改,其實也就是segment的數量,只要修改segment的這些線程不是修改同一個segment,那么這些線程就可以並行執行,這也就是ConcurrentHashMap的並發量(segment的數量)。

  注意,ConcurrentHashMap創建完成后,也就是segment的數量、並發級別已經確定,則segment的數量和並發級別都不能再改變了,即使發生擴容,也是segment中的table進行擴容,segment的數量保持不變。

 

二.源碼分析-類定義

2.1 極簡ConcurrentHashMap定義

  下面是省略了大部分代碼的ConcurrentHashMap定義:

package java.util.concurrent;

import java.util.AbstractMap;
import java.util.concurrent.locks.ReentrantLock;

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable {

    final Segment<K, V>[] segments;

    /**
     * segment分段的定義
     */
    static final class Segment<K, V> extends ReentrantLock implements Serializable {

        transient volatile HashEntry<K, V>[] table;
    }

    /**
     * 存放的元素節點
     */
    static final class HashEntry<K, V> {

    }
}

 

2.2 Segment內部類

  ConcurrentHashMap內部有一個segments屬性,是Segment類型的數組,Segment類和HashMap很相似(Java7的HashMap),維持一個數組,數組的每個元素是HashEntry類型(可以理解為HashMap的節點),當發生hash沖突后,則使用拉鏈法(頭插法)來解決沖突。

  而Segment數組的定義如下,省略了方法:

static final class Segment<K, V> extends ReentrantLock implements Serializable {
    static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
    private static final long serialVersionUID = 2249069246763182397L;
    
    // segment的負載因子(segments數組中的所有segment負載因子都相同)
    final float loadFactor;
    
    // segment中保存元素的數組
    transient volatile HashEntry<K, V>[] table;
   
    // 該segment中的元素個數
    transient int count;
    
    // modify count,該segment被修改的次數
    transient int modCount;
    
    // segment的擴容閾值
    transient int threshold;
}

  注意每個Segment都有負載因子、以及擴容閾值,但是后面可以看到,其實segments數組中的每一個segment,負載因子和擴容閾值都相同,因為創建的時候,都是使用0號segment的負載因子和擴容閾值。

 

2.3 HashEntry內部類

  Segment類中有一個table數組,table數組的每個元素都是HashEntry類型,和HashMap的Entry類似,源碼如下:(省略了方法)

/**
 * map中每個元素的類型
 */
static final class HashEntry<K, V> {
    final int hash;
    final K key;
    volatile V value;
    volatile HashEntry<K, V> next;
}

 

2.4 ConcurrentHashMap的一些常量

  ConcurrentHashMap中有很多常量,

// 默認初始容量
static final int DEFAULT_INITIAL_CAPACITY = 16;

// 默認的負載因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;

// 默認的並發級別(同時支持多少線程並發修改)
// 因為JDK7中ConcurrentHashMap中是用分段鎖實現並發,不同分段的數據可以進行並發操作,同一個段的數據不能同時修改
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

// 最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;

// 每一個分段的數組容量初始值
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

// 最多能有多少個segment
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative

// 嘗試對整個map進行操作(比如說統計map的元素數量),可能需要鎖定全部segment;
// 這個常量表示鎖定所有segment前,嘗試的次數
static final int RETRIES_BEFORE_LOCK = 2;

  

三.常用接口源碼分析

3.1 ConcurrentHashMap構造方法

  ConcurrentHashMap有多個構造方法,但是內部其實都是調用同一個進行創建:

public ConcurrentHashMap() {
    this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

public ConcurrentHashMap(int initialCapacity) {
    this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}

/**
 * 統一調用的構造方法
 *
 * @param initialCapacity  初始容量
 * @param loadFactor       負載因子
 * @param concurrencyLevel 並發級別
 */
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
    // 校驗參數合法性
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) {
        throw new IllegalArgumentException();
    }

    // 對並發級別進行調整,不允許超過segment的數量(超過segment其實是沒有意義的)
    if (concurrencyLevel > MAX_SEGMENTS) {
        concurrencyLevel = MAX_SEGMENTS;
    }

    // 根據concurrencyLevel確定sshift和ssize的值
    int ssize = 1; // ssize是表示segment的數量,ssize是不小於concurrencyLevel的數,並且是2的n次方
    int sshift = 0;// sshift是ssize轉換為2進制后的位數,比如ssize為16(1000),則sshift為4
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 比如concurrencyLevel默認為16,走完循環,sshift為4,ssize為16
    // 如果concurrentLevel為8,則sshift為3,ssize為8

    // segment的shift偏移量
    this.segmentShift = 32 - sshift;
    // segment掩碼
    this.segmentMask = ssize - 1;

    // 對傳入的初始容量進行調整(不允許大於最大容量)
    if (initialCapacity > MAXIMUM_CAPACITY) {
        initialCapacity = MAXIMUM_CAPACITY;
    }

    // 假設傳入的容量為128,並發級別為16,則initialCapacity為128,ssize為16
    int c = initialCapacity / ssize;
    // c可以理解為根據傳入的初始容量,計算出每個segment中的數組容量
    if (c * ssize < initialCapacity) {
        ++c;
    }

    // cap表示的是每個segment中的數組容量
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    // 如果默認的每個segment中的數組長度小於上面計算出來的每個segment的數組長度,則將容量翻倍
    while (cap < c) {
        cap <<= 1;
    }

    // 創建一個segment,作為segments數組的第一個segment
    Segment<K, V> s0 = new Segment<K, V>(loadFactor, (int) (cap * loadFactor), new HashEntry[cap]);

    // 創建segments數組
    Segment<K, V>[] ss = (Segment<K, V>[]) new Segment[ssize];

    // 將s0賦值給segments數組的第一個元素(偏移量為0)
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]

    // 復制segment數組
    this.segments = ss;
}

/**
 * 傳入map,將map中的元素加入到ConcurrentHashMap中
 * 注意使用默認的負載因子(0.75)和默認的並發級別(16)
 * 初始容量取map容量和默認容量的較大值
 */
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1, DEFAULT_INITIAL_CAPACITY),
            DEFAULT_LOAD_FACTOR,
            DEFAULT_CONCURRENCY_LEVEL);
    putAll(m);
}

  

3.2 map.put操作

  map.put,map就是指ConcurrentHashMap,其實就是確定HashEntry應該放入哪個segment中的哪個位置,所以可分為3步:

  1.首先需要確定放入哪個segment;

  2.確定segment后,再確定HashEntry應該放入segment的哪個位置;

  3.進行覆蓋覆蓋或者插入。

/**
 * put操作,注意key或者value為null時,會拋出NPE
 */
@SuppressWarnings("unchecked")
public V put(K key, V value) {
    Segment<K, V> s;
    if (value == null) {
        throw new NullPointerException();
    }

    // 計算key的hash
    int hash = hash(key);

    // hash值右移shift位后 與 mask掩碼進行取與,確定數據應該放到哪個segments數組的哪一個segment中
    int j = (hash >>> segmentShift) & segmentMask;

    // 判斷計算出的segment數組位置上的segment是否為null,如果為null,則進行創建segment
    if ((s = (Segment<K, V>) UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null) {
        s = ensureSegment(j);
    }

    // 創建segment后,將數據put到segment中,調用的segment.put()
    return s.put(key, hash, value, false);
}

  

3.3 創建新segment

  上面put的時候,如果發現segment為null,則會進行調用ensureSegment進行創建segment,代碼如下:

/**
 * 擴容(創建)segment
 *
 * @param k 需要擴容或者需要創建的segment位置
 * @return 返回擴容后的segment
 */
@SuppressWarnings("unchecked")
private Segment<K, V> ensureSegment(int k) {
    final Segment<K, V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // 傳入的index,對應的偏移量
    Segment<K, V> seg;

    // 判斷是否需要擴容或者創建segment,如果獲取到segment不為null,則返回segment
    if ((seg = (Segment<K, V>) UNSAFE.getObjectVolatile(ss, u)) == null) {
        Segment<K, V> proto = ss[0]; // “原型設計模式”,使用segments數組的0號位置segment
        int cap = proto.table.length;// 0號segment的table長度
        float lf = proto.loadFactor; // 0號segment的負載因子
        int threshold = (int) (cap * lf); // 0號segment的擴容閾值

        // 創建一個HashEntry的數組,數組容量和0號segment相同
        HashEntry<K, V>[] tab = (HashEntry<K, V>[]) new HashEntry[cap];

        // 再次檢測,指定的segment是不是為null,如果為null才進行下一步操作
        if ((seg = (Segment<K, V>) UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck
            // 創建segment
            Segment<K, V> s = new Segment<K, V>(lf, threshold, tab);

            // 使用CAS將新創建的segment填入指定的位置
            while ((seg = (Segment<K, V>) UNSAFE.getObjectVolatile(ss, u)) == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) {
                    break;
                }
            }
        }
    }

    // 返回新增的segment
    return seg;
}

  上面需要注意的是:

  1.創建segment中的table數組時,是使用0號segment的table屬性(長度、負載因子、閾值);

  2.創建segment前會進行再check,check發現segment的確為null時,再進行創建segment;

  3.利用CAS來將創建的segment填入segments數組中;

 

3.4 segment.put操作

  當確定HashEntry應該放到哪個segment后,就開始調用segment的put方法,如下:

/**
 * 在確定應該存放到那個segment后,就segment.put()將k-v存入segment中
 *
 * @param key          put的key
 * @param hash         hash(key)的值
 * @param value        put的value
 * @param onlyIfAbsent true:key對應的Entry不進行覆蓋,false:key對應的entry存在與否,都進行put覆蓋
 * @return
 */
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 先獲取鎖(ReentrantLock,內部使用非公平鎖)
    HashEntry<K, V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K, V>[] tab = table;

        // 根據hash值計算出在segment的table中的位置
        int index = (tab.length - 1) & hash;

        // 定位到segment的table的index位置(第一個節點)
        HashEntry<K, V> first = entryAt(tab, index);

        // 從第一個節點開始遍歷
        for (HashEntry<K, V> e = first; ; ) {
            // 節點不為空,則判斷是否key是否相同(相同HashEntry)
            if (e != null) {
                K k;
                // 比較是否key是否相等(判斷put的key是否已經存在)
                if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
                    // 如果key已經存在,則進行覆蓋,如果onlyIsAbsent為false(不關心key對應的Entry是否存在)
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        // 覆蓋舊值,修改計數加1
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            } else {
                // 頭插法,將put的k-v創建新HashEntry,放到first的前面
                if (node != null) {
                    node.setNext(first);
                } else {
                    node = new HashEntry<K, V>(hash, key, value, first);
                }

                // segment中table元素數量加1
                int c = count + 1;

                // 加1后的size大於擴容閾值,且數組的長度小於最大容量,則進行rehash
                if (c > threshold && tab.length < MAXIMUM_CAPACITY) {
                    // 擴容,並進行rehash
                    rehash(node);
                } else {
                    // 將節點放入數組中的指定位置
                    setEntryAt(tab, index, node);
                }

                // 修改次數加一,修改segment的table元素個數
                ++modCount;
                count = c;

                // 舊值為null
                oldValue = null;
                break;
            }
        }
    } finally {
        // 釋放鎖
        unlock();
    }
    return oldValue;
}

  梳理一下,大致在做下面幾件事:

  1.先獲取鎖(ReetrantLock,內部使用非公平鎖NonFairSync);

  2.獲取到鎖后根據hash計算出在table的位置;

  3.遍歷table的HashEntry的鏈表,如果有相同key的,則進行覆蓋,如果沒有,在進行頭插法;

  4.插入鏈表后,確定是否需要擴容,需要則執行rehash;

  5.修改計數(count、modCount),並且釋放鎖。

 

3.5 segment.rehash擴容

  segment擴容時,會將該segment的容量擴容為之前的2倍,並且將各位置的鏈表節點元素進行rehash。

/**
 * 將segment的table容量擴容一倍(newCap=oldCap*2),注意只會擴容該node所在的segment
 *
 * @param node segment[i]->鏈表的頭結點
 */
@SuppressWarnings("unchecked")
private void rehash(HashEntry<K, V> node) {
    HashEntry<K, V>[] oldTable = table;
    int oldCapacity = oldTable.length;

    // 新容量為舊容量的2倍
    int newCapacity = oldCapacity << 1;

    // 設置新的擴容閾值
    threshold = (int) (newCapacity * loadFactor);

    // 申請新數組,數組長度為新容量值
    HashEntry<K, V>[] newTable = (HashEntry<K, V>[]) new HashEntry[newCapacity];

    // 循環遍歷segment的數組(舊數組)
    int sizeMask = newCapacity - 1; // 新的掩碼
    for (int i = 0; i < oldCapacity; i++) {
        // 獲取第i個位置的HashEntry節點,如果該節點為null,則該位置為空,不作處理
        HashEntry<K, V> e = oldTable[i];
        if (e != null) {
            HashEntry<K, V> next = e.next;

            // 計算新位置
            int idx = e.hash & sizeMask;

            // next為null,表示該位置只有一個節點,直接將節點復制到新位置
            if (next == null) {   //  Single node on list
                newTable[idx] = e;
            } else { // Reuse consecutive sequence at same slot
                HashEntry<K, V> lastRun = e;
                int lastIdx = idx;
                for (HashEntry<K, V> last = next; last != null; last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                newTable[lastIdx] = lastRun;
                // 從頭結點開始,開始將節點拷貝到新數組中
                for (HashEntry<K, V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K, V> n = newTable[k];
                    newTable[k] = new HashEntry<K, V>(h, p.key, v, n);
                }
            }
        }
    }

    // 將頭結點添加到segment的table中
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}

  為啥擴容的時候沒有加鎖呀?

  其實已經加鎖了,只不過不是在rehash中加鎖!!!因為rehash是在map.put中調用,put的時候已經加鎖了,所以rehash中不用加鎖。

  

3.6 map.get操作

  get操作,先定位到segment,然后定位到segment的具體位置,進行獲取

/**
 * 從ConcurrentHashMap中獲取key對應的value,不需要加鎖
 */
public V get(Object key) {
    Segment<K, V> s;
    HashEntry<K, V>[] tab;

    // 計算key的hash
    int h = hash(key);

    // 計算key處於哪一個segment中(index)
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;

    // 獲取數組中該位置的segment,如果該segment的table不為空,那么就繼續在segment中查找,否則返回null,因為未找到
    if ((s = (Segment<K, V>) UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {

        // tab指向segment的table數組,通過hash計算定位到table數組的位置(然后開始遍歷鏈表)
        HashEntry<K, V> e;
        for (e = (HashEntry<K, V>) UNSAFE.getObjectVolatile(tab, ((long) (((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            // 判斷key和hash是否匹配,匹配則證明找到要查找的數據,將數據返回
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    
    return null;
}

  

3.7 map.remove操作

   刪除節點,和get的流程差不多,先定位到segment,然后確定segment的哪個位置(哪條鏈表),遍歷鏈表,找到后進行刪除。

/**
 * 刪除map中key對應的元素
 */
public V remove(Object key) {
    // 計算key的hash
    int hash = hash(key);

    // 根據hash確定segment
    Segment<K, V> s = segmentForHash(hash);

    // 調用segment.remove進行刪除
    return s == null ? null : s.remove(key, hash, null);
}

/**
 * 刪除segment中key對應的hashEntry
 * 如果傳入的value不為空,則會在value匹配的時候進行刪除,否則不操作
 */
final V segmeng.remove(Object key, int hash, Object value) {
    // 獲取鎖失敗,則不斷自旋嘗試獲取鎖
    if (!tryLock()) {
        scanAndLock(key, hash);
    }

    V oldValue = null;
    try {
        HashEntry<K, V>[] tab = table;
        // 定位到segment中table的哪個位置
        int index = (tab.length - 1) & hash;
        HashEntry<K, V> e = entryAt(tab, index);
        HashEntry<K, V> pred = null;

        // 遍歷鏈表
        while (e != null) {
            K k;
            HashEntry<K, V> next = e.next;
            // 如果key和hash都匹配
            if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
                V v = e.value;
                // 如果沒有傳入value,則直接刪除該節點
                // 如果傳入了value,比如調用的map.remove(key,value),則要value匹配才會刪除,否則不操作
                if (value == null || value == v || value.equals(v)) {
                    if (pred == null) { // 頭結點就是要找刪除的元素,next為null,則將null賦值數組的該位置
                        setEntryAt(tab, index, next);
                    } else { // 
                        pred.setNext(next);
                    }

                    // 修改次數加一,map數量減一
                    ++modCount;
                    --count;
                    oldValue = v;
                }
                break;
            }

            // 不匹配時,pred保存當前一次檢測的節點,e指向下一個節點
            pred = e;
            e = next;
        }
    } finally {
        unlock();// 釋放鎖
    }
    return oldValue;
}

  

3.8 map.size操作

  ConcurrentHashMap的size(),需要統計每一個segment中的元素數量(也就是count值),因為不同的segment允許並發修改,統計過程中可能會出現修改操作,導致統計不准確,所以要想准確統計整個map的元素數量,可以這樣做:通過加鎖的方式來解決(將所有的segment都加鎖),這樣就能保證元素不會變化了,這是我們的想法。

  而ConcurrentHashMap是這樣解決的,先嘗試不加鎖進行統計兩次,這兩次統計,不止會統計每個segment的元素數量,還會統計每個segment的modCount(修改次數),進行匯總;如果兩次統計的modCount總量相同,也就說明兩次統計期間沒有修改操作,證明統計的元素總量正確;如果兩次modCount總量不相同,表示有修改操作,則進行重試,如果重試后,發現還是不准確(modCount不匹配),那么就嘗試為所有的segment加鎖,再進行統計。

  源碼如下:

/**
 * 獲取ConcurrentHashMap中的元素個數,如果元素個數超過Integer.MAX_VALUE,則返回Integer.MAX_VALUE
 */
public int size() {
    final Segment<K, V>[] segments = this.segments;
    int size;           // 返回元素數量(統計結果->元素的總量)
    boolean overflow;   // 標志是否溢出(是否超過Integer.MAX_VALUE)
    long sum;           // 所有segment的modCount總量次數(整個map的修改次數)
    long last = 0L;     // previous sum,上一輪統計的modCount總量
    int retries = -1;   // 記錄重試的次數

    try {
        // 此處for循環主要用於控制重試
        for (; ; ) {
            // 重試的次數如果達到RETRIES_BEFORE_LOCK,則強制獲取所有segment的鎖
            if (retries++ == RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j) {
                    ensureSegment(j).lock();
                    // 強制獲取segment的table第i個位置,並加鎖
                }
            }

            sum = 0L;
            size = 0;
            overflow = false;
            // 開始對segments中的每一個segment中進行統計
            for (int j = 0; j < segments.length; ++j) {
                // 獲取第j個segment
                Segment<K, V> seg = segmentAt(segments, j);
                // 如果segment不為空,則進行統計
                if (seg != null) {
                    sum += seg.modCount;
                    int c = seg.count;
                    // size累加
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }

            // 如果本次統計的modCount總量和上次一樣,則表示在這兩次統計之間沒有進行過修改,則不再重試
            if (sum == last) {
                break;
            }
            // 記錄本次統計的modCount總量
            last = sum;
        }
    } finally {
        // 判斷是否加了鎖(如果retries大於RETRIES_BEFORE_LOCK),則證明加了鎖,於是進行釋放鎖
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    return overflow ? Integer.MAX_VALUE : size;
}

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM