MySQL
一、概述
什么是數據庫 ?
答:數據的倉庫,如:在ATM的示例中我們創建了一個 db 目錄,稱其為數據庫
什么是 MySQL、Oracle、SQLite、Access、MS SQL Server等 ?
答:他們均是一個軟件,都有兩個主要的功能:
- a. 將數據保存到文件或內存
- b. 接收特定的命令,然后對文件進行相應的操作
PS:如果有了以上軟件,無須自己再去創建文件和文件夾,而是直接傳遞 命令 給上述軟件,讓其來進行文件操作,他們統稱為數據庫管理系統(DBMS,Database Management System)
什么是SQL ?
答:上述提到MySQL等軟件可以接受命令,並做出相應的操作,由於命令中可以包含刪除文件、獲取文件內容等眾多操作,對於編寫的命令就是是SQL語句。SQL,是結構化語言(Structured Query Language)的縮寫,SQL是一種專門用來與數據庫通信的語言。
二、下載安裝
MySQL是一個關系型數據庫管理系統,由瑞典MySQL AB 公司開發,目前屬於 Oracle 旗下公司。MySQL 最流行的關系型數據庫管理系統,在 WEB 應用方面MySQL是最好的 RDBMS (Relational Database Management System,關系數據庫管理系統) 應用軟件之一。
想要使用MySQL來存儲並操作數據,則需要做幾件事情:
a. 安裝MySQL服務端
b. 安裝MySQL客戶端
b. 【客戶端】連接【服務端】
c. 【客戶端】發送命令給【服務端MySQL】服務的接受命令並執行相應操作(增刪改查等)
下載 http://dev.mysql.com/downloads/mysql/ 安裝 windows: 點點點 Linux: yum install mysql-server Mac: 點點點
三、數據庫操作
1、顯示數據
SHOW DATABASES;
默認數據庫:
mysql - 用戶權限相關數據
test - 用於用戶測試數據
information_schema - MySQL本身架構相關數據
2、使用數據庫
USE db_name;
3、顯示所有表
SHOW TABLES;
4、用戶授權
用戶管理:
創建用戶 create user '用戶名'@'IP地址' identified by '密碼'; 刪除用戶 drop user '用戶名'@'IP地址'; 修改用戶 rename user '用戶名'@'IP地址'; to '新用戶名'@'IP地址';; 修改密碼 set password for '用戶名'@'IP地址' = Password('新密碼') PS:用戶權限相關數據保存在mysql數據庫的user表中,所以也可以直接對其進行操作(不建議)
授權管理:
show grants for '用戶'@'IP地址' -- 查看權限 grant 權限 on 數據庫.表 to '用戶'@'IP地址' -- 授權 revoke 權限 on 數據庫.表 from '用戶'@'IP地址' -- 取消權限

all privileges 除grant外的所有權限 select 僅查權限 select,insert 查和插入權限 ... usage 無訪問權限 alter 使用alter table alter routine 使用alter procedure和drop procedure create 使用create table create routine 使用create procedure create temporary tables 使用create temporary tables create user 使用create user、drop user、rename user和revoke all privileges create view 使用create view delete 使用delete drop 使用drop table execute 使用call和存儲過程 file 使用select into outfile 和 load data infile grant option 使用grant 和 revoke index 使用index insert 使用insert lock tables 使用lock table process 使用show full processlist select 使用select show databases 使用show databases show view 使用show view update 使用update reload 使用flush shutdown 使用mysqladmin shutdown(關閉MySQL) super 使用change master、kill、logs、purge、master和set global。還允許mysqladmin調試登陸 replication client 服務器位置的訪問 replication slave 由復制從屬使用

對於目標數據庫以及內部其他: 數據庫名.* 數據庫中的所有 數據庫名.表 指定數據庫中的某張表 數據庫名.存儲過程 指定數據庫中的存儲過程 *.* 所有數據庫

用戶名@IP地址 用戶只能在改IP下才能訪問 用戶名@192.168.1.% 用戶只能在改IP段下才能訪問(通配符%表示任意) 用戶名@% 用戶可以再任意IP下訪問(默認IP地址為%)

grant all privileges on db1.tb1 TO '用戶名'@'IP' grant select on db1.* TO '用戶名'@'IP' grant select,insert on *.* TO '用戶名'@'IP' revoke select on db1.tb1 from '用戶名'@'IP'
四、表操作
1、創建表
create table 表名( 列名 類型 是否可以為空, 列名 類型 是否可以為空 )

是否可空,null表示空,非字符串 not null - 不可空 null - 可空

默認值,創建列時可以指定默認值,當插入數據時如果未主動設置,則自動添加默認值 create table tb1( nid int not null defalut 2, num int not null )

自增,如果為某列設置自增列,插入數據時無需設置此列,默認將自增(表中只能有一個自增列) create table tb1( nid int not null auto_increment primary key, num int null ) 或 create table tb1( nid int not null auto_increment, num int null, index(nid) ) 注意:1、對於自增列,必須是索引(含主鍵)。 2、對於自增可以設置步長和起始值 show session variables like 'auto_inc%'; set session auto_increment_increment=2; set session auto_increment_offset=10; shwo global variables like 'auto_inc%'; set global auto_increment_increment=2; set global auto_increment_offset=10;

主鍵,一種特殊的唯一索引,不允許有空值,如果主鍵使用單個列,則它的值必須唯一,如果是多列,則其組合必須唯一。 create table tb1( nid int not null auto_increment primary key, num int null ) 或 create table tb1( nid int not null, num int not null, primary key(nid,num) )

外鍵,一個特殊的索引,只能是指定內容 creat table color( nid int not null primary key, name char(16) not null ) create table fruit( nid int not null primary key, smt char(32) null , color_id int not null, constraint fk_cc foreign key (color_id) references color(nid) )
2、刪除表
drop table 表名
3、清空表
delete from 表名 truncate table 表名
4、修改表
添加列:alter table 表名 add 列名 類型 刪除列:alter table 表名 drop column 列名 修改列: alter table 表名 modify column 列名 類型; -- 類型 alter table 表名 change 原列名 新列名 類型; -- 列名,類型 添加主鍵: alter table 表名 add primary key(列名); 刪除主鍵: alter table 表名 drop primary key; alter table 表名 modify 列名 int, drop primary key; 添加外鍵:alter table 從表 add constraint 外鍵名稱(形如:FK_從表_主表) foreign key 從表(外鍵字段) references 主表(主鍵字段); 刪除外鍵:alter table 表名 drop foreign key 外鍵名稱 修改默認值:ALTER TABLE testalter_tbl ALTER i SET DEFAULT 1000; 刪除默認值:ALTER TABLE testalter_tbl ALTER i DROP DEFAULT;
5、基本數據類型
MySQL的數據類型大致分為:數值、時間和字符串
http://www.runoob.com/mysql/mysql-data-types.html
五、基本操作
1、增

insert into 表 (列名,列名...) values (值,值,值...) insert into 表 (列名,列名...) values (值,值,值...),(值,值,值...) insert into 表 (列名,列名...) select (列名,列名...) from 表
2、刪

delete from 表 delete from 表 where id=1 and name='alex'
3、改

update 表 set name = 'alex' where id>1
4、查

select * from 表 select * from 表 where id > 1 select nid,name,gender as gg from 表 where id > 1
5、其他

1、條件 select * from 表 where id > 1 and name != 'alex' and num = 12; select * from 表 where id between 5 and 16; select * from 表 where id in (11,22,33) select * from 表 where id not in (11,22,33) select * from 表 where id in (select nid from 表) 2、通配符 select * from 表 where name like 'ale%' - ale開頭的所有(多個字符串) select * from 表 where name like 'ale_' - ale開頭的所有(一個字符) 3、限制 select * from 表 limit 5; - 前5行 select * from 表 limit 4,5; - 從第4行開始的5行 select * from 表 limit 5 offset 4 - 從第4行開始的5行 4、排序 select * from 表 order by 列 asc - 根據 “列” 從小到大排列 select * from 表 order by 列 desc - 根據 “列” 從大到小排列 select * from 表 order by 列1 desc,列2 asc - 根據 “列1” 從大到小排列,如果相同則按列2從小到大排序 5、分組 select num from 表 group by num select num,nid from 表 group by num,nid select num,nid from 表 where nid > 10 group by num,nid order nid desc select num,nid,count(*),sum(score),max(score),min(score) from 表 group by num,nid select num from 表 group by num having max(id) > 10 特別的:group by 必須在where之后,order by之前 6、連表 無對應關系則不顯示 select A.num, A.name, B.name from A,B Where A.nid = B.nid 無對應關系則不顯示 select A.num, A.name, B.name from A inner join B on A.nid = B.nid A表所有顯示,如果B中無對應關系,則值為null select A.num, A.name, B.name from A left join B on A.nid = B.nid B表所有顯示,如果B中無對應關系,則值為null select A.num, A.name, B.name from A right join B on A.nid = B.nid 7、組合 組合,自動處理重合 select nickname from A union select name from B 組合,不處理重合 select nickname from A union all select name from B
pymsql
pymsql是Python中操作MySQL的模塊,其使用方法和MySQLdb幾乎相同。
一、下載安裝:
pip3 install pymysql
二、使用
1、執行SQL
#!/usr/bin/env python # -*- coding:utf-8 -*- import pymysql # 創建連接 conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1') # 創建游標 cursor = conn.cursor() # 執行SQL,並返回收影響行數 effect_row = cursor.execute("update hosts set host = '1.1.1.2'") # 執行SQL,並返回受影響行數 #effect_row = cursor.execute("update hosts set host = '1.1.1.2' where nid > %s", (1,)) # 執行SQL,並返回受影響行數 #effect_row = cursor.executemany("insert into hosts(host,color_id)values(%s,%s)", [("1.1.1.11",1),("1.1.1.11",2)]) # 提交,不然無法保存新建或者修改的數據 conn.commit() # 關閉游標 cursor.close() # 關閉連接 conn.close()
2、獲取新創建數據自增ID
#!/usr/bin/env python # -*- coding:utf-8 -*- import pymysql conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1') cursor = conn.cursor() cursor.executemany("insert into hosts(host,color_id)values(%s,%s)", [("1.1.1.11",1),("1.1.1.11",2)]) conn.commit() cursor.close() conn.close() # 獲取最新自增ID new_id = cursor.lastrowid
3、獲取查詢數據
#!/usr/bin/env python # -*- coding:utf-8 -*- import pymysql conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1') cursor = conn.cursor() cursor.execute("select * from hosts") # 獲取第一行數據 row_1 = cursor.fetchone() # 獲取前n行數據 # row_2 = cursor.fetchmany(3) # 獲取所有數據 # row_3 = cursor.fetchall() conn.commit() cursor.close() conn.close()
注:在fetch數據時按照順序進行,可以使用cursor.scroll(num,mode)來移動游標位置,如:
- cursor.scroll(1,mode='relative') # 相對當前位置移動
- cursor.scroll(2,mode='absolute') # 相對絕對位置移動
4、fetch數據類型
關於默認獲取的數據是元祖類型,如果想要或者字典類型的數據,即:
#!/usr/bin/env python # -*- coding:utf-8 -*- import pymysql conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1') # 游標設置為字典類型 cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) r = cursor.execute("call p1()") result = cursor.fetchone() conn.commit() cursor.close() conn.close()
SQLAchemy
SQLAlchemy是Python編程語言下的一款ORM框架,該框架建立在數據庫API之上,使用關系對象映射進行數據庫操作,簡言之便是:將對象轉換成SQL,然后使用數據API執行SQL並獲取執行結果。
SQLAlchemy本身無法操作數據庫,其必須以來pymsql等第三方插件,Dialect用於和數據API進行交流,根據配置文件的不同調用不同的數據庫API,從而實現對數據庫的操作,如:
MySQL-Python mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] 更多詳見:http://docs.sqlalchemy.org/en/latest/dialects/index.html
一、底層處理
使用 Engine/ConnectionPooling/Dialect 進行數據庫操作,Engine使用ConnectionPooling連接數據庫,然后再通過Dialect執行SQL語句。
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5) # 執行SQL # cur = engine.execute( # "INSERT INTO hosts (host, color_id) VALUES ('1.1.1.22', 3)" # ) # 新插入行自增ID # cur.lastrowid # 執行SQL # cur = engine.execute( # "INSERT INTO hosts (host, color_id) VALUES(%s, %s)",[('1.1.1.22', 3),('1.1.1.221', 3),] # ) # 執行SQL # cur = engine.execute( # "INSERT INTO hosts (host, color_id) VALUES (%(host)s, %(color_id)s)", # host='1.1.1.99', color_id=3 # ) # 執行SQL # cur = engine.execute('select * from hosts') # 獲取第一行數據 # cur.fetchone() # 獲取第n行數據 # cur.fetchmany(3) # 獲取所有數據 # cur.fetchall()
二、ORM功能使用
使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有組件對數據進行操作。根據類創建對象,對象轉換成SQL,執行SQL。
1、創建表

#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5) Base = declarative_base() # 創建單表 class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32)) extra = Column(String(16)) __table_args__ = ( UniqueConstraint('id', 'name', name='uix_id_name'), Index('ix_id_name', 'name', 'extra'), ) # 一對多 class Favor(Base): __tablename__ = 'favor' nid = Column(Integer, primary_key=True) caption = Column(String(50), default='red', unique=True) class Person(Base): __tablename__ = 'person' nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) favor_id = Column(Integer, ForeignKey("favor.nid")) # 多對多 class Group(Base): __tablename__ = 'group' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) port = Column(Integer, default=22) class Server(Base): __tablename__ = 'server' id = Column(Integer, primary_key=True, autoincrement=True) hostname = Column(String(64), unique=True, nullable=False) class ServerToGroup(Base): __tablename__ = 'servertogroup' nid = Column(Integer, primary_key=True, autoincrement=True) server_id = Column(Integer, ForeignKey('server.id')) group_id = Column(Integer, ForeignKey('group.id')) def init_db(): Base.metadata.create_all(engine) def drop_db(): Base.metadata.drop_all(engine)
ForeignKeyConstraint(['other_id'], ['othertable.other_id']),
2、操作表

#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5) Base = declarative_base() # 創建單表 class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32)) extra = Column(String(16)) __table_args__ = ( UniqueConstraint('id', 'name', name='uix_id_name'), Index('ix_id_name', 'name', 'extra'), ) def __repr__(self): return "%s-%s" %(self.id, self.name) # 一對多 class Favor(Base): __tablename__ = 'favor' nid = Column(Integer, primary_key=True) caption = Column(String(50), default='red', unique=True) def __repr__(self): return "%s-%s" %(self.nid, self.caption) class Person(Base): __tablename__ = 'person' nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) favor_id = Column(Integer, ForeignKey("favor.nid")) # 與生成表結構無關,僅用於查詢方便 favor = relationship("Favor", backref='pers') # 多對多 class ServerToGroup(Base): __tablename__ = 'servertogroup' nid = Column(Integer, primary_key=True, autoincrement=True) server_id = Column(Integer, ForeignKey('server.id')) group_id = Column(Integer, ForeignKey('group.id')) group = relationship("Group", backref='s2g') server = relationship("Server", backref='s2g') class Group(Base): __tablename__ = 'group' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) port = Column(Integer, default=22) # group = relationship('Group',secondary=ServerToGroup,backref='host_list') class Server(Base): __tablename__ = 'server' id = Column(Integer, primary_key=True, autoincrement=True) hostname = Column(String(64), unique=True, nullable=False) def init_db(): Base.metadata.create_all(engine) def drop_db(): Base.metadata.drop_all(engine) Session = sessionmaker(bind=engine) session = Session()
- 增
obj = Users(name="alex0", extra='sb') session.add(obj) session.add_all([ Users(name="alex1", extra='sb'), Users(name="alex2", extra='sb'), ]) session.commit()
- 刪
session.query(Users).filter(Users.id > 2).delete() session.commit()
- 改
session.query(Users).filter(Users.id > 2).update({"name" : "099"}) session.query(Users).filter(Users.id > 2).update({Users.name: Users.name + "099"}, synchronize_session=False) session.query(Users).filter(Users.id > 2).update({"num": Users.num + 1}, synchronize_session="evaluate") session.commit()
- 查
ret = session.query(Users).all() ret = session.query(Users.name, Users.extra).all() ret = session.query(Users).filter_by(name='alex').all() ret = session.query(Users).filter_by(name='alex').first()
- 其他
# 條件 ret = session.query(Users).filter_by(name='alex').all() ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all() ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all() ret = session.query(Users).filter(Users.id.in_([1,3,4])).all() ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all() ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all() from sqlalchemy import and_, or_ ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all() ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all() ret = session.query(Users).filter( or_( Users.id < 2, and_(Users.name == 'eric', Users.id > 3), Users.extra != "" )).all() # 通配符 ret = session.query(Users).filter(Users.name.like('e%')).all() ret = session.query(Users).filter(~Users.name.like('e%')).all() # 限制 ret = session.query(Users)[1:2] # 排序 ret = session.query(Users).order_by(Users.name.desc()).all() ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 分組 from sqlalchemy.sql import func ret = session.query(Users).group_by(Users.extra).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all() # 連表 ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all() ret = session.query(Person).join(Favor).all() ret = session.query(Person).join(Favor, isouter=True).all() # 組合 q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union(q2).all() q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union_all(q2).all()
更多功能參見文檔,猛擊這里下載PDF

#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine,and_,or_,func,Table from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String,ForeignKey,UniqueConstraint,DateTime from sqlalchemy.orm import sessionmaker,relationship Base = declarative_base() #生成一個SqlORM 基類 # 服務器賬號和組 # HostUser2Group = Table('hostuser_2_group',Base.metadata, # Column('hostuser_id',ForeignKey('host_user.id'),primary_key=True), # Column('group_id',ForeignKey('group.id'),primary_key=True), # ) # 用戶和組關系表,用戶可以屬於多個組,一個組可以有多個人 UserProfile2Group = Table('userprofile_2_group',Base.metadata, Column('userprofile_id',ForeignKey('user_profile.id'),primary_key=True), Column('group_id',ForeignKey('group.id'),primary_key=True), ) # 程序登陸用戶和服務器賬戶,一個人可以有多個服務器賬號,一個服務器賬號可以給多個人用 UserProfile2HostUser= Table('userprofile_2_hostuser',Base.metadata, Column('userprofile_id',ForeignKey('user_profile.id'),primary_key=True), Column('hostuser_id',ForeignKey('host_user.id'),primary_key=True), ) class Host(Base): __tablename__='host' id = Column(Integer,primary_key=True,autoincrement=True) hostname = Column(String(64),unique=True,nullable=False) ip_addr = Column(String(128),unique=True,nullable=False) port = Column(Integer,default=22) def __repr__(self): return "<id=%s,hostname=%s, ip_addr=%s>" %(self.id, self.hostname, self.ip_addr) class HostUser(Base): __tablename__ = 'host_user' id = Column(Integer,primary_key=True) AuthTypes = [ (u'ssh-passwd',u'SSH/Password'), (u'ssh-key',u'SSH/KEY'), ] # auth_type = Column(ChoiceType(AuthTypes)) auth_type = Column(String(64)) username = Column(String(64),unique=True,nullable=False) password = Column(String(255)) host_id = Column(Integer,ForeignKey('host.id')) # groups = relationship('Group', # secondary=HostUser2Group, # backref='host_list') __table_args__ = (UniqueConstraint('host_id','username', name='_host_username_uc'),) def __repr__(self): return "<id=%s,name=%s>" %(self.id,self.username) class Group(Base): __tablename__ = 'group' id = Column(Integer,primary_key=True) name = Column(String(64),unique=True,nullable=False) def __repr__(self): return "<id=%s,name=%s>" %(self.id,self.name) class UserProfile(Base): __tablename__ = 'user_profile' id = Column(Integer,primary_key=True) username = Column(String(64),unique=True,nullable=False) password = Column(String(255),nullable=False) # host_list = relationship('HostUser', # secondary=UserProfile2HostUser, # backref='userprofiles') # groups = relationship('Group', # secondary=UserProfile2Group, # backref='userprofiles') def __repr__(self): return "<id=%s,name=%s>" %(self.id,self.username) class AuditLog(Base): __tablename__ = 'audit_log' id = Column(Integer,primary_key=True) userprofile_id = Column(Integer,ForeignKey('user_profile.id')) hostuser_id = Column(Integer,ForeignKey('host_user.id')) action_choices2 = [ (u'cmd',u'CMD'), (u'login',u'Login'), (u'logout',u'Logout'), ] action_type = Column(ChoiceType(action_choices2)) #action_type = Column(String(64)) cmd = Column(String(255)) date = Column(DateTime) # user_profile = relationship("UserProfile") #bind_host = relationship("BindHost") engine = create_engine("mysql+pymsql://root:123@localhost:3306/stupid_jumpserver",echo=False) Base.metadata.create_all(engine) #創建所有表結構
Paramiko
paramiko模塊,基於SSH用於連接遠程服務器並執行相關操作。
一、安裝
pip3 install paramiko
二、使用
SSHClient
用於連接遠程服務器並執行基本命令
基於用戶名密碼連接:
import paramiko # 創建SSH對象 ssh = paramiko.SSHClient() # 允許連接不在know_hosts文件中的主機 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 連接服務器 ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', password='123') # 執行命令 stdin, stdout, stderr = ssh.exec_command('ls') # 獲取命令結果 result = stdout.read() # 關閉連接 ssh.close()

import paramiko transport = paramiko.Transport(('hostname', 22)) transport.connect(username='wupeiqi', password='123') ssh = paramiko.SSHClient() ssh._transport = transport stdin, stdout, stderr = ssh.exec_command('df') print stdout.read() transport.close()
基於公鑰密鑰連接:
import paramiko private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa') # 創建SSH對象 ssh = paramiko.SSHClient() # 允許連接不在know_hosts文件中的主機 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 連接服務器 ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', key=private_key) # 執行命令 stdin, stdout, stderr = ssh.exec_command('df') # 獲取命令結果 result = stdout.read() # 關閉連接 ssh.close()

import paramiko private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa') transport = paramiko.Transport(('hostname', 22)) transport.connect(username='wupeiqi', pkey=private_key) ssh = paramiko.SSHClient() ssh._transport = transport stdin, stdout, stderr = ssh.exec_command('df') transport.close()

import paramiko from io import StringIO key_str = """-----BEGIN RSA PRIVATE KEY----- MIIEpQIBAAKCAQEAq7gLsqYArAFco02/55IgNg0r7NXOtEM3qXpb/dabJ5Uyky/8 NEHhFiQ7deHIRIuTW5Zb0kD6h6EBbVlUMBmwJrC2oSzySLU1w+ZNfH0PE6W6fans H80whhuc/YgP+fjiO+VR/gFcqib8Rll5UfYzf5H8uuOnDeIXGCVgyHQSmt8if1+e 7hn1MVO1Lrm9Fco8ABI7dyv8/ZEwoSfh2C9rGYgA58LT1FkBRkOePbHD43xNfAYC tfLvz6LErMnwdOW4sNMEWWAWv1fsTB35PAm5CazfKzmam9n5IQXhmUNcNvmaZtvP c4f4g59mdsaWNtNaY96UjOfx83Om86gmdkKcnwIDAQABAoIBAQCnDBGFJuv8aA7A ZkBLe+GN815JtOyye7lIS1n2I7En3oImoUWNaJEYwwJ8+LmjxMwDCtAkR0XwbvY+ c+nsKPEtkjb3sAu6I148RmwWsGncSRqUaJrljOypaW9dS+GO4Ujjz3/lw1lrxSUh IqVc0E7kyRW8kP3QCaNBwArYteHreZFFp6XmtKMtXaEA3saJYILxaaXlYkoRi4k8 S2/K8aw3ZMR4tDCOfB4o47JaeiA/e185RK3A+mLn9xTDhTdZqTQpv17/YRPcgmwz zu30fhVXQT/SuI0sO+bzCO4YGoEwoBX718AWhdLJFoFq1B7k2ZEzXTAtjEXQEWm6 01ndU/jhAasdfasdasdfasdfa3eraszxqwefasdfadasdffsFIfAsjQb4HdkmHuC OeJrJOd+CYvdEeqJJNnF6AbHyYHIECkj0Qq1kEfLOEsqzd5nDbtkKBte6M1trbjl HtJ2Yb8w6o/q/6Sbj7wf/cW3LIYEdeVCjScozVcQ9R83ea05J+QOAr4nAoGBAMaq UzLJfLNWZ5Qosmir2oHStFlZpxspax/ln7DlWLW4wPB4YJalSVovF2Buo8hr8X65 lnPiE41M+G0Z7icEXiFyDBFDCtzx0x/RmaBokLathrFtI81UCx4gQPLaSVNMlvQA 539GsubSrO4LpHRNGg/weZ6EqQOXvHvkUkm2bDDJAoGATytFNxen6GtC0ZT3SRQM WYfasdf3xbtuykmnluiofasd2sfmjnljkt7khghmghdasSDFGQfgaFoKfaawoYeH C2XasVUsVviBn8kPSLSVBPX4JUfQmA6h8HsajeVahxN1U9e0nYJ0sYDQFUMTS2t8 RT57+WK/0ONwTWHdu+KnaJECgYEAid/ta8LQC3p82iNAZkpWlGDSD2yb/8rH8NQg 9tjEryFwrbMtfX9qn+8srx06B796U3OjifstjJQNmVI0qNlsJpQK8fPwVxRxbJS/ pMbNICrf3sUa4sZgDOFfkeuSlgACh4cVIozDXlR59Z8Y3CoiW0uObEgvMDIfenAj 98pl3ZkCgYEAj/UCSni0dwX4pnKNPm6LUgiS7QvIgM3H9piyt8aipQuzBi5LUKWw DlQC4Zb73nHgdREtQYYXTu7p27Bl0Gizz1sW2eSgxFU8eTh+ucfVwOXKAXKU5SeI +MbuBfUYQ4if2N/BXn47+/ecf3A4KgB37Le5SbLDddwCNxGlBzbpBa0= -----END RSA PRIVATE KEY-----""" private_key = paramiko.RSAKey(file_obj=StringIO(key_str)) transport = paramiko.Transport(('10.0.1.40', 22)) transport.connect(username='wupeiqi', pkey=private_key) ssh = paramiko.SSHClient() ssh._transport = transport stdin, stdout, stderr = ssh.exec_command('df') result = stdout.read() transport.close() print(result)
SFTPClient
用於連接遠程服務器並執行上傳下載
基於用戶名密碼上傳下載:
import paramiko transport = paramiko.Transport(('hostname',22)) transport.connect(username='wupeiqi',password='123') sftp = paramiko.SFTPClient.from_transport(transport) # 將location.py 上傳至服務器 /tmp/test.py sftp.put('/tmp/location.py', '/tmp/test.py') # 將remove_path 下載到本地 local_path sftp.get('remove_path', 'local_path') transport.close()
基於公鑰密鑰上傳下載:
import paramiko private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa') transport = paramiko.Transport(('hostname', 22)) transport.connect(username='wupeiqi', pkey=private_key ) sftp = paramiko.SFTPClient.from_transport(transport) # 將location.py 上傳至服務器 /tmp/test.py sftp.put('/tmp/location.py', '/tmp/test.py') # 將remove_path 下載到本地 local_path sftp.get('remove_path', 'local_path') transport.close()

#!/usr/bin/env python # -*- coding:utf-8 -*- import paramiko import uuid class SSHConnection(object): def __init__(self, host='172.16.103.191', port=22, username='wupeiqi',pwd='123'): self.host = host self.port = port self.username = username self.pwd = pwd self.__k = None def create_file(self): file_name = str(uuid.uuid4()) with open(file_name,'w') as f: f.write('sb') return file_name def run(self): self.connect() self.upload('/home/wupeiqi/tttttttttttt.py') self.rename('/home/wupeiqi/tttttttttttt.py', '/home/wupeiqi/ooooooooo.py) self.close() def connect(self): transport = paramiko.Transport((self.host,self.port)) transport.connect(username=self.username,password=self.pwd) self.__transport = transport def close(self): self.__transport.close() def upload(self,target_path): # 連接,上傳 file_name = self.create_file() sftp = paramiko.SFTPClient.from_transport(self.__transport) # 將location.py 上傳至服務器 /tmp/test.py sftp.put(file_name, target_path) def rename(self, old_path, new_path): ssh = paramiko.SSHClient() ssh._transport = self.__transport # 執行命令 cmd = "mv %s %s" % (old_path, new_path,) stdin, stdout, stderr = ssh.exec_command(cmd) # 獲取命令結果 result = stdout.read() def cmd(self, command): ssh = paramiko.SSHClient() ssh._transport = self.__transport # 執行命令 stdin, stdout, stderr = ssh.exec_command(command) # 獲取命令結果 result = stdout.read() return result ha = SSHConnection() ha.run()
# 對於更多限制命令,需要在系統中設置 /etc/sudoers Defaults requiretty Defaults:cmdb !requiretty

import paramiko import uuid class SSHConnection(object): def __init__(self, host='192.168.11.61', port=22, username='alex',pwd='alex3714'): self.host = host self.port = port self.username = username self.pwd = pwd self.__k = None def run(self): self.connect() pass self.close() def connect(self): transport = paramiko.Transport((self.host,self.port)) transport.connect(username=self.username,password=self.pwd) self.__transport = transport def close(self): self.__transport.close() def cmd(self, command): ssh = paramiko.SSHClient() ssh._transport = self.__transport # 執行命令 stdin, stdout, stderr = ssh.exec_command(command) # 獲取命令結果 result = stdout.read() return result def upload(self,local_path, target_path): # 連接,上傳 sftp = paramiko.SFTPClient.from_transport(self.__transport) # 將location.py 上傳至服務器 /tmp/test.py sftp.put(local_path, target_path) ssh = SSHConnection() ssh.connect() r1 = ssh.cmd('df') ssh.upload('s2.py', "/home/alex/s7.py") ssh.close()
堡壘機
堡壘機執行流程:
- 管理員為用戶在服務器上創建賬號(將公鑰放置服務器,或者使用用戶名密碼)
- 用戶登陸堡壘機,輸入堡壘機用戶名密碼,現實當前用戶管理的服務器列表
- 用戶選擇服務器,並自動登陸
- 執行操作並同時將用戶操作記錄
注:配置.brashrc實現ssh登陸后自動執行腳本,如:/usr/bin/python /home/wupeiqi/menu.py

#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine, and_, or_, func, Table from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, DateTime from sqlalchemy.orm import sessionmaker, relationship Base = declarative_base() # 生成一個SqlORM 基類 class Host(Base): __tablename__ = 'host' id = Column(Integer, primary_key=True, autoincrement=True) hostname = Column(String(64), unique=True, nullable=False) ip_addr = Column(String(128), unique=True, nullable=False) port = Column(Integer, default=22) class HostUser(Base): __tablename__ = 'host_user' id = Column(Integer, primary_key=True, autoincrement=True) username = Column(String(64), unique=True, nullable=False) AuthTypes = [ ('p', 'SSH/Password'), ('r', 'SSH/KEY'), ] auth_type = Column(String(16)) cert = Column(String(255)) host_id = Column(Integer, ForeignKey('host.id')) __table_args__ = ( UniqueConstraint('host_id', 'username', name='_host_username_uc'), ) class Group(Base): __tablename__ = 'group' id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(64), unique=True, nullable=False) class UserProfile(Base): __tablename__ = 'user_profile' id = Column(Integer, primary_key=True, autoincrement=True) username = Column(String(64), unique=True, nullable=False) password = Column(String(255), nullable=False) class Group2UserProfile(Base): __tablename__ = 'group_2_user_profile' id = Column(Integer, primary_key=True, autoincrement=True) user_profile_id = Column(Integer, ForeignKey('user_profile.id')) group_id = Column(Integer, ForeignKey('group.id')) __table_args__ = ( UniqueConstraint('user_profile_id', 'group_id', name='ux_user_group'), ) class Group2HostUser(Base): __tablename__ = 'group_2_host_user' id = Column(Integer, primary_key=True, autoincrement=True) host_user_id = Column(Integer, ForeignKey('host_user.id')) group_id = Column(Integer, ForeignKey('group.id')) __table_args__ = ( UniqueConstraint('group_id', 'host_user_id', name='ux_group_host_user'), ) class UserProfile2HostUser(Base): __tablename__ = 'user_profile_2_host_user' id = Column(Integer, primary_key=True, autoincrement=True) host_user_id = Column(Integer, ForeignKey('host_user.id')) user_profile_id = Column(Integer, ForeignKey('user_profile.id')) __table_args__ = ( UniqueConstraint('user_profile_id', 'host_user_id', name='ux_user_host_user'), ) class AuditLog(Base): __tablename__ = 'audit_log' id = Column(Integer, primary_key=True, autoincrement=True) action_choices2 = [ (u'cmd', u'CMD'), (u'login', u'Login'), (u'logout', u'Logout'), ] action_type = Column(String(16)) cmd = Column(String(255)) date = Column(DateTime) user_profile_id = Column(Integer, ForeignKey('user_profile.id')) host_user_id = Column(Integer, ForeignKey('host_user.id'))
實現過程
1、前戲
import paramiko import sys import os import socket import select import getpass tran = paramiko.Transport(('10.211.55.4', 22,)) tran.start_client() tran.auth_password('wupeiqi', '123') # 打開一個通道 chan = tran.open_session() # 獲取一個終端 chan.get_pty() # 激活器 chan.invoke_shell() ######### # 利用sys.stdin,肆意妄為執行操作 # 用戶在終端輸入內容,並將內容發送至遠程服務器 # 遠程服務器執行命令,並將結果返回 # 用戶終端顯示內容 ######### chan.close() tran.close()
2、肆意妄為(一)
import paramiko import sys import os import socket import select import getpass from paramiko.py3compat import u tran = paramiko.Transport(('10.211.55.4', 22,)) tran.start_client() tran.auth_password('wupeiqi', '123') # 打開一個通道 chan = tran.open_session() # 獲取一個終端 chan.get_pty() # 激活器 chan.invoke_shell() while True: # 監視用戶輸入和服務器返回數據 # sys.stdin 處理用戶輸入 # chan 是之前創建的通道,用於接收服務器返回信息 readable, writeable, error = select.select([chan, sys.stdin, ],[],[],1) if chan in readable: try: x = u(chan.recv(1024)) if len(x) == 0: print('\r\n*** EOF\r\n') break sys.stdout.write(x) sys.stdout.flush() except socket.timeout: pass if sys.stdin in readable: inp = sys.stdin.readline() chan.sendall(inp) chan.close() tran.close()

#!/usr/bin/env python # -*- coding:utf-8 -*- import paramiko import sys import os import socket import select import getpass from paramiko.py3compat import u default_username = getpass.getuser() username = input('Username [%s]: ' % default_username) if len(username) == 0: username = default_username hostname = input('Hostname: ') if len(hostname) == 0: print('*** Hostname required.') sys.exit(1) tran = paramiko.Transport((hostname, 22,)) tran.start_client() default_auth = "p" auth = input('Auth by (p)assword or (r)sa key[%s] ' % default_auth) if len(auth) == 0: auth = default_auth if auth == 'r': default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_rsa') path = input('RSA key [%s]: ' % default_path) if len(path) == 0: path = default_path try: key = paramiko.RSAKey.from_private_key_file(path) except paramiko.PasswordRequiredException: password = getpass.getpass('RSA key password: ') key = paramiko.RSAKey.from_private_key_file(path, password) tran.auth_publickey(username, key) else: pw = getpass.getpass('Password for %s@%s: ' % (username, hostname)) tran.auth_password(username, pw) # 打開一個通道 chan = tran.open_session() # 獲取一個終端 chan.get_pty() # 激活器 chan.invoke_shell() while True: # 監視用戶輸入和服務器返回數據 # sys.stdin 處理用戶輸入 # chan 是之前創建的通道,用於接收服務器返回信息 readable, writeable, error = select.select([chan, sys.stdin, ],[],[],1) if chan in readable: try: x = u(chan.recv(1024)) if len(x) == 0: print('\r\n*** EOF\r\n') break sys.stdout.write(x) sys.stdout.flush() except socket.timeout: pass if sys.stdin in readable: inp = sys.stdin.readline() chan.sendall(inp) chan.close() tran.close()
3、肆意妄為(二)
import paramiko import sys import os import socket import select import getpass import termios import tty from paramiko.py3compat import u tran = paramiko.Transport(('10.211.55.4', 22,)) tran.start_client() tran.auth_password('wupeiqi', '123') # 打開一個通道 chan = tran.open_session() # 獲取一個終端 chan.get_pty() # 激活器 chan.invoke_shell() # 獲取原tty屬性 oldtty = termios.tcgetattr(sys.stdin) try: # 為tty設置新屬性 # 默認當前tty設備屬性: # 輸入一行回車,執行 # CTRL+C 進程退出,遇到特殊字符,特殊處理。 # 這是為原始模式,不認識所有特殊符號 # 放置特殊字符應用在當前終端,如此設置,將所有的用戶輸入均發送到遠程服務器 tty.setraw(sys.stdin.fileno()) chan.settimeout(0.0) while True: # 監視 用戶輸入 和 遠程服務器返回數據(socket) # 阻塞,直到句柄可讀 r, w, e = select.select([chan, sys.stdin], [], [], 1) if chan in r: try: x = u(chan.recv(1024)) if len(x) == 0: print('\r\n*** EOF\r\n') break sys.stdout.write(x) sys.stdout.flush() except socket.timeout: pass if sys.stdin in r: x = sys.stdin.read(1) if len(x) == 0: break chan.send(x) finally: # 重新設置終端屬性 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty) chan.close() tran.close()

#!/usr/bin/env python # -*- coding:utf-8 -*- import paramiko import sys import os import socket import getpass import termios import tty import select from paramiko.py3compat import u def interactive_shell(chan): # 獲取原tty屬性 oldtty = termios.tcgetattr(sys.stdin) try: # 為tty設置新屬性 # 默認當前tty設備屬性: # 輸入一行回車,執行 # CTRL+C 進程退出,遇到特殊字符,特殊處理。 # 這是為原始模式,不認識所有特殊符號 # 放置特殊字符應用在當前終端,如此設置,將所有的用戶輸入均發送到遠程服務器 tty.setraw(sys.stdin.fileno()) tty.setcbreak(sys.stdin.fileno()) chan.settimeout(0.0) while True: r, w, e = select.select([chan, sys.stdin], [], []) if chan in r: try: x = u(chan.recv(1024)) if len(x) == 0: sys.stdout.write('\r\n*** EOF\r\n') break sys.stdout.write(x) sys.stdout.flush() except socket.timeout: pass if sys.stdin in r: x = sys.stdin.read(1) if len(x) == 0: break chan.send(x) finally: # 重新設置終端屬性 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty) def run(): hostname = input('請輸入主機名: ') tran = paramiko.Transport((hostname, 22,)) tran.start_client() username = input('請輸入用戶名: ') auth = input('請輸入密碼進行驗證(p) 或 (r)sa Key進行驗證?') if auth == 'r': path = input('請輸入RSA key 路徑: ') try: key = paramiko.RSAKey.from_private_key_file(path) except paramiko.PasswordRequiredException: password = getpass.getpass('RSA key password: ') key = paramiko.RSAKey.from_private_key_file(path, password) tran.auth_publickey(username, key) else: pw = getpass.getpass('請輸入密碼 %s@%s: ' % (username, hostname)) tran.auth_password(username, pw) # 打開一個通道 chan = tran.open_session() # 獲取一個終端 chan.get_pty() # 激活器 chan.invoke_shell() interactive_shell(chan) chan.close() tran.close() if __name__ == '__main__': run()

#!/usr/bin/env python # -*- coding:utf-8 -*- import paramiko import sys import os import socket import getpass import termios import tty import select from paramiko.py3compat import u def interactive_shell(chan): # 獲取原tty屬性 oldtty = termios.tcgetattr(sys.stdin) try: # 為tty設置新屬性 # 默認當前tty設備屬性: # 輸入一行回車,執行 # CTRL+C 進程退出,遇到特殊字符,特殊處理。 # 這是為原始模式,不認識所有特殊符號 # 放置特殊字符應用在當前終端,如此設置,將所有的用戶輸入均發送到遠程服務器 tty.setraw(sys.stdin.fileno()) tty.setcbreak(sys.stdin.fileno()) chan.settimeout(0.0) while True: r, w, e = select.select([chan, sys.stdin], [], []) if chan in r: try: x = u(chan.recv(1024)) if len(x) == 0: sys.stdout.write('\r\n*** EOF\r\n') break sys.stdout.write(x) sys.stdout.flush() except socket.timeout: pass if sys.stdin in r: x = sys.stdin.read(1) if len(x) == 0: break chan.send(x) finally: # 重新設置終端屬性 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty) def run(): db_dict = { 'c1.salt.com': { 'root': {'user': 'root', 'auth': 'r', "cert": 'key路徑'}, 'alex': {'user': 'alex', 'auth': 'p', "cert": '密碼'}, }, 'c2.salt.com': { 'alex': {'user': 'alex', 'auth': 'p', "cert": '密碼'}, }, } for row in db_dict.keys(): print(row) hostname = input('請選擇主機: ') tran = paramiko.Transport((hostname, 22,)) tran.start_client() for item in db_dict[hostname].keys(): print(item) username = input('請輸入用戶: ') user_dict = db_dict[hostname][username] if username['auth'] == 'r': key = paramiko.RSAKey.from_private_key_file(user_dict['cert']) tran.auth_publickey(username, key) else: pw = user_dict['cert'] tran.auth_password(username, pw) # 打開一個通道 chan = tran.open_session() # 獲取一個終端 chan.get_pty() # 激活器 chan.invoke_shell() interactive_shell(chan) chan.close() tran.close() if __name__ == '__main__': run()

#!/usr/bin/env python # -*- coding:utf-8 -*- import paramiko import sys import os import socket import getpass import termios import tty import select from paramiko.py3compat import u def interactive_shell(chan): # 獲取原tty屬性 oldtty = termios.tcgetattr(sys.stdin) try: # 為tty設置新屬性 # 默認當前tty設備屬性: # 輸入一行回車,執行 # CTRL+C 進程退出,遇到特殊字符,特殊處理。 # 這是為原始模式,不認識所有特殊符號 # 放置特殊字符應用在當前終端,如此設置,將所有的用戶輸入均發送到遠程服務器 tty.setraw(sys.stdin.fileno()) tty.setcbreak(sys.stdin.fileno()) chan.settimeout(0.0) log = open('handle.log', 'a+', encoding='utf-8') flag = False temp_list = [] while True: r, w, e = select.select([chan, sys.stdin], [], []) if chan in r: try: x = u(chan.recv(1024)) if len(x) == 0: sys.stdout.write('\r\n*** EOF\r\n') break # 如果用戶上一次點擊的是tab鍵,則獲取返回的內容寫入在記錄中 if flag: if x.startswith('\r\n'): pass else: temp_list.append(x) flag = False sys.stdout.write(x) sys.stdout.flush() except socket.timeout: pass if sys.stdin in r: # 讀取用戶在終端數據每一個字符 x = sys.stdin.read(1) if len(x) == 0: break # 如果用戶點擊TAB鍵 if x == '\t': flag = True else: # 未點擊TAB鍵,則將每個操作字符記錄添加到列表中,以便之后寫入文件 temp_list.append(x) # 如果用戶敲回車,則將操作記錄寫入文件 if x == '\r': log.write(''.join(temp_list)) log.flush() temp_list.clear() chan.send(x) finally: # 重新設置終端屬性 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty) def run(): db_dict = { 'c1.salt.com': { 'root': {'user': 'root', 'auth': 'r', "cert": 'key路徑'}, 'alex': {'user': 'alex', 'auth': 'p', "cert": '密碼'}, }, 'c2.salt.com': { 'root': {'user': 'root', 'auth': 'r', "cert": 'key路徑'}, 'alex': {'user': 'alex', 'auth': 'p', "cert": '密碼'}, }, } for row in db_dict.keys(): print(row) hostname = input('請選擇主機: ') tran = paramiko.Transport((hostname, 22,)) tran.start_client() for item in db_dict[hostname].keys(): print(item) username = input('請輸入用戶: ') user_dict = db_dict[hostname][username] if username['auth'] == 'r': key = paramiko.RSAKey.from_private_key_file(user_dict['cert']) tran.auth_publickey(username, key) else: pw = user_dict['cert'] tran.auth_password(username, pw) # 打開一個通道 chan = tran.open_session() # 獲取一個終端 chan.get_pty() # 激活器 chan.invoke_shell() interactive_shell(chan) chan.close() tran.close() if __name__ == '__main__': run()
更多參見:paramoko源碼 https://github.com/paramiko/paramiko
Alex堡壘機:http://www.cnblogs.com/alex3714/articles/5286889.html