수업정리/Fundamental

[Python] 한글 유니코드 10 -

GreenBNN 2023. 6. 9. 14:35

crawl_wiki2.py

 import time
 import urllib
 import bs4
 import requests


 start_url = "https://ko.wikipedia.org/wiki/Special:Random"
 target_url = "https://ko.wikipedia.org/wiki/Philosophy"

 def find_first_link(url):
     response = requests.get(url)
     html = response.text
     soup = bs4.BeautifulSoup(html, "html.parser")
     body_text = soup.get_text()
     # This div contains the article's body
     # (June 2017 Note: Body nested in two div tags)
     content_div = soup.find(id="mw-content-text").find(class_="mw-parser-output")

     # stores the first link found in the article, if the article contains no
     # links this value will remain None
     article_link = None
     if content_div == None : return #XXX Not to find_all in None

     # Find all the direct children of content_div that are paragraphs
     for element in content_div.find_all("p", recursive=False):
         # Find the first anchor tag that's a direct child of a paragraph.
         # It's important to only look at direct children, because other types
         # of link, e.g. footnotes and pronunciation, could come before the
         # first link to an article. Those other link types aren't direct
         # children though, they're in divs of various classes.
         if element.find("a", recursive=False):
             article_link = element.find("a", recursive=False).get('href')
             break

     if not article_link:
         return

     # Build a full url from the relative article_link url
     first_link = urllib.parse.urljoin('https://ko.wikipedia.org/', article_link)
     return first_link, body_text
 def continue_crawl(search_history, target_url, max_steps=25):
     if search_history[-1] == target_url:
         print("We've found the target article!")
         return False
     elif len(search_history) > max_steps:
         print("The search has gone on suspiciously long, aborting search!")
         return False
     elif search_history[-1] in search_history[:-1]:
         print("We've arrived at an article we've already seen, aborting search!")
         return False
     else:
         return True
~                                                                                                                                                             ~

 

 scrape_wiki_random_save.py

 #code:utf-8
 from crawl_wiki2 import find_first_link, continue_crawl

 #main
 import time
 import urllib.parse # to decode any URL string
 from datetime import datetime
 from wiki_sqldb import connect_db, insert_data

 conn = connect_db('wiki.sqlite3')

 start_url = "https://ko.wikipedia.org/wiki/Special:Random"
 target_url = "https://ko.wikipedia.org/wiki/한글"
 input('start:%s' %start_url)
 input('target:%s' %target_url)

 article_chain = [start_url]

 while continue_crawl(article_chain, target_url):
     print('current last', article_chain[-1])
     from_url = urllib.parse.unquote(article_chain[-1])
     print('decoded', from_url) #

     try:
         first_link,body_text = find_first_link(article_chain[-1])
         if first_link==None:
             print("We've arrived at an article with no links, aborting search!")
             break

         article_chain.append(first_link) # Update the last url
         time.sleep(2) # Slow things down so as to not hammer Wikipedia's servers

         a_record = {} #dict()
         a_record['from_url']  = from_url
         a_record['to_url'] = urllib.parse.unquote(first_link)
         a_record['text'] = body_text #XXX
         datestr = '{:%Y%m%d%H%M%S}'.format(datetime.now())
         a_record['time'] = datestr #XXX
         a_record['updated'] = 'N' #XXX check if there is the url in the table.
         insert_data(conn, a_record)
         input()
         input(a_record)

     except Exception as e:
         print(e, file=sys.stderr)
~

 

 

 wiki_sqldb.py  

 #coding: utf-8
 import sqlite3
 import sys

 def connect_db(dbname):
     conn = sqlite3.connect(dbname)
     return conn

 def create_table(conn):
     cur = conn.cursor()
     tsql = '''create table if not exists wiki_links \
              (from_url text not null, to_url text not null, \
              time text not null, updated text); '''
     print(tsql)
     cur.execute(tsql)
     conn.commit()
     return

 def insert_data(conn, col_dict):
     cur = conn.cursor()
     try:
         a_sql = f'''insert into wiki_links (from_url, to_url, time, updated) \
                  values("{col_dict['from_url']}", "{col_dict['to_url']}", \
                  "{col_dict['time']}", "{col_dict['updated']}"); '''
         print(a_sql)
         cur.execute(a_sql)
     except Exception as e:
         print(e, file=sys.stderr)
     conn.commit()
     return

 if __name__ == '__main__':
     try:
         conn = connect_db('wiki.sqlite3')
         create_table(conn)
         conn.close()
     except Exception as e:
         print(e)