什麼是ETL?
讓GPT大大來教教我們www
ETL是資料倉儲領域中一個重要的概念,全稱為Extract-Transform-Load,中文可譯為"抽取-轉換-載入"。ETL的作用是將來自不同來源的資料抽取出來,經過清理、轉換、整合等處理後,最終將處理好的資料載入到資料倉儲或其他單一的資料存放區中,為進一步的資料分析做準備。
其中包含三個部分:
- 抽取(Extract):從各種來源系統(如關係型資料庫、檔案等)中抽取出所需的資料。
- 轉換(Transform):對抽取出的原始資料進行清理、轉換、整合等處理,使它們能夠被載入到目標系統。例如解決不一致的問題、轉換資料格式、合併重複資料等。
-載入(Load):將處理好的資料載入到目標資料倉儲或其他資料存放區中,為後續的資料分析、商業智能等做準備。
ETL過程通常是自動化的,透過工具或自行開發的程式來執行。Python同樣可以用於開發ETL流程,利用如Pandas、SQLAlchemy等庫進行資料處理。
恩,笨笨的我就把它當作是,對於文檔處理的一個過程吧。那既然提到要處理文檔,python在進行資料處理的,csv、execl、資料庫這些名詞就浮出來了,那本篇想來寫一下這部分。
import csv
with open("./DataFile/file.csv", mode="r", newline="", encoding="utf8") as f:
csv_data = csv.reader(f)
for i in csv_data:
print(i)
print(i[1].title())
import csv
data = [
['a', 'b', 'c'],
['1', '2', '3'],
['x', 'y', 'z']
]
with open("./OutPutFile/new_file.csv", mode="w", newline="", encoding="utf8") as f:
csv_writer = csv.writer(f, delimiter=",")
for row in data:
csv_writer.writerow(row)
首先要來了解一下,excel中下方的工作表,在英文中是worksheet
要先安裝指令
> pip install openpyxl
from openpyxl import load_workbook
wb = load_workbook("./DataFile/Dodgers.xlsx")
result = []
ws = wb.worksheets[0] # 第一個工作表
for i in ws.iter_rows(): # 選擇行
result.append([j.value for j in i]) # 選擇列
print(result,end="\n")
# TODO: 計算全肥打
sum = 0
for i in result[1:]:
sum+=int(i[11])
print(f"the total homerun:{sum}")
讀csv轉成excel
from openpyxl import Workbook
import csv
data_rows = []
with open("./DataFile/file.csv", mode="r", newline="", encoding="utf8") as f:
csv_data = csv.reader(f)
for i in csv_data:
data_rows.append(i)
wb = Workbook()
ws = wb.active
ws.title = "my_file" # 工作表平稱
# 工作表頁籤底色
ws.sheet_properties.tabColor="1072BA"
for i in data_rows:
ws.append(i)
wb.save("./OutputFile/my_file.xlsx")
這邊以sqlite3做示範,因為sqlite3本身就是python的核心liberay之一。
這邊直接將CRUD寫在同在一起,再視需求去刪除/下註解(不要傻傻直接複製貼上就執行歐)
import sqlite3
# NOTE:建立db
# sqlite3.connect(":memory:") 會將dn建立在快取記憶體,那當電腦關機時,這個db就會消失
conn = sqlite3.connect("datafile.db")
cursor = conn.cursor()
# 新增資料庫
cursor.execute("""
create table people (
id integer primary key,
name text,
count integer
)
""")
conn.commit() # 執行sql
# NOTE:新增資料
cursor.execute("""
insert into people (name, count) values (?, ?)
""", ("alice", 30))
conn.commit() # 執行sql
# NOTE:取資料(這樣的寫法可以防止sql injection)
result = cursor.execute("""select * from people where name = :username""", {"username":"bob"})
print(result.fetchmany(1)) # 取x筆資料
# NOTE:更新資料
cursor.execute("""
update people set count = 25 where name = 'bob'
""")
conn.commit() # 執行sql
result = cursor.execute("""select * from people""")
print(result.fetchall()) # 取所有資料
# NOTE:刪除資料
cursor.execute("""
delete from people where name = 'bob'
""")
conn.commit() # 執行sql
conn.close() # 關閉資料
ORM(Object-Relational Mapping)是一種程式設計技術,用於實現物件導向程式設計語言與關聯式資料庫的互操作。其主要目的是通過一個物件虛擬層,將資料庫中的表映射為程式設計語言中的物件,開發人員可以直接使用面向對象的概念來操作資料庫,而不需要編寫大量的 SQL 語句。
安裝指令
> pip install sqlalchemy
使用了 SQLAlchemy 的 Core API,通過創建表格對象來操作數據庫
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
from sqlalchemy.orm import sessionmaker
# NOTE: 根據需求調整
class CFG:
database_name = "datafile.db"
database_server = "sqlite"
# 数据库配置
DATABASE_URL = f"{CFG.database_server}:///{CFG.database_name}"
# 定义模型
engine = create_engine(DATABASE_URL)
metadata = MetaData()
# 創建資料表
people = Table("people", metadata,
Column("id", Integer, primary_key=True),
Column("name", String),
Column("count", Integer)
)
# 创建表
metadata.create_all(engine)
# 初始化数据库
def initialize_database():
Session = sessionmaker(bind=engine)
session = Session()
# NOTE: 插入一条数据
new_person = {"name": "Bob", "count": 40}
session.execute(people.insert(), new_person)
session.commit()
# NOTE: 無條件查询
result = session.query(people).all()
for i in result: print(i)
# NOTE: 有條件查询
result = session.query(people).filter_by(name="Alice").all()
for i in result: print(i)
session.close()
# 执行初始化
if __name__ == "__main__":
initialize_database()
那接下來,引入sqlalchemy
使用了 ORM 功能,通過定義對象和類來映射數據庫表格
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
# 定义基类
Base = declarative_base()
# NOTE: 根據需求調整
class CFG:
database_name = "datafile.db"
database_server = "sqlite"
class People(Base):
__tablename__ = "people"
id = Column(Integer, primary_key=True)
name = Column(String)
count = Column(Integer)
def __repr__(self):
return f"<People(id={self.id}, name={self.name}, count={self.count})>"
# 数据库配置
DATABASE_URL = f"{CFG.database_server}:///{CFG.database_name}"
# 创建数据库引擎
engine = create_engine(DATABASE_URL)
Base.metadata.create_all(engine)
# 初始化数据库
def initialize_database():
Session = sessionmaker(bind=engine)
session = Session()
new1 = People(name="Jane", count=5)
new2 = People(name="Mark", count=10)
session.add(new1)
session.add(new2)
session.commit()
# NOTE: 無條件查询
result = session.query(people).all()
for i in result: print(i)
session.close()
# 执行初始化
if __name__ == "__main__":
initialize_database()
🥰以上是本文所分享的內容。如果您發現任何錯誤或遺漏,請不吝賜教。