starter-kit/bots/task/test-github
2019-08-25 15:21:02 +00:00

197 lines
6.4 KiB
Python
Executable file

#!/usr/bin/env python3
# This file is part of Cockpit.
#
# Copyright (C) 2017 Red Hat, Inc.
#
# Cockpit is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# Cockpit is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Cockpit; If not, see <http://www.gnu.org/licenses/>.
ADDRESS = ("127.0.0.8", 9898)
import ctypes
import fnmatch
import json
import os
import signal
import shutil
import sys
import tempfile
import time
import unittest
import urllib.parse
BASE = os.path.dirname(__file__)
sys.path.insert(1, os.path.join(BASE, ".."))
from task import cache
from task import github
def mockServer():
# Data used by the above handler
data = {
"count": 0,
}
issues =[{ "number": "5", "state": "open", "created_at": "2011-04-22T13:33:48Z" },
{ "number": "6", "state": "closed", "closed_at": "2011-04-21T13:33:48Z" },
{ "number": "7", "state": "open" }]
import http.server
class Handler(http.server.BaseHTTPRequestHandler):
def replyData(self, value, headers={ }, status=200):
self.send_response(status)
for name, content in headers.items():
self.send_header(name, content)
self.end_headers()
self.wfile.write(value.encode('utf-8'))
self.wfile.flush()
def replyJson(self, value, headers={ }, status=200):
headers["Content-type"] = "application/json"
self.server.data["count"] += 1
self.replyData(json.dumps(value), headers=headers, status=status)
def do_GET(self):
parsed = urllib.parse.urlparse(self.path)
if parsed.path == "/count":
self.replyJson(self.server.data["count"])
elif parsed.path == "/issues":
self.replyJson(issues)
elif parsed.path == "/test/user":
if self.headers.get("If-None-Match") == "blah":
self.replyData("", status=304)
else:
self.replyJson({ "user": "blah" }, headers={ "ETag": "blah" })
elif parsed.path == "/test/user/modified":
if self.headers.get("If-Modified-Since") == "Thu, 05 Jul 2012 15:31:30 GMT":
self.replyData("", status=304)
else:
self.replyJson({ "user": "blah" }, headers={ "Last-Modified": "Thu, 05 Jul 2012 15:31:30 GMT" })
elif parsed.path == "/orgs/cockpit-project/teams":
self.replyJson([
{ "name": github.TEAM_CONTRIBUTORS, "id": 1 },
{ "name": "Other team", "id": 2 }
])
elif parsed.path == "/teams/1/members":
self.replyJson([
{ "login": "one" },
{ "login": "two" },
{ "login": "three" }
])
else:
self.send_error(404, 'Mock Not Found: ' + parsed.path)
def do_DELETE(self):
parsed = urllib.parse.urlparse(self.path)
if parsed.path == '/issues/7':
del issues[-1]
self.replyJson({})
else:
self.send_error(404, 'Mock Not Found: ' + parsed.path)
httpd = http.server.HTTPServer(ADDRESS, Handler)
httpd.data = data
child = os.fork()
if child != 0:
httpd.server_close()
return child
# prctl(PR_SET_PDEATHSIG, SIGTERM)
try:
libc = ctypes.CDLL('libc.so.6')
libc.prctl(1, 15)
except OSError:
pass
httpd.serve_forever()
os._exit(1)
def mockKill(child):
os.kill(child, signal.SIGTERM)
os.waitpid(child, 0)
class TestLogger(object):
def __init__(self):
self.data = ""
# Yes, we open the file each time
def write(self, value):
self.data = self.data + value
class TestGitHub(unittest.TestCase):
def setUp(self):
self.child = mockServer()
self.temp = tempfile.mkdtemp()
def tearDown(self):
mockKill(self.child)
shutil.rmtree(self.temp)
def testCache(self):
api = github.GitHub("http://127.0.0.8:9898", cacher=cache.Cache(self.temp))
values = api.get("/test/user")
cached = api.get("/test/user")
self.assertEqual(json.dumps(values), json.dumps(cached))
count = api.get("/count")
self.assertEqual(count, 1)
def testLog(self):
api = github.GitHub("http://127.0.0.8:9898", cacher=cache.Cache(self.temp))
api.log = TestLogger()
api.get("/test/user")
api.cache.mark(time.time() + 1)
api.get("/test/user")
expect = '127.0.0.8:9898 - - * "GET /test/user HTTP/1.1" 200 -\n' + \
'127.0.0.8:9898 - - * "GET /test/user HTTP/1.1" 304 -\n'
match = fnmatch.fnmatch(api.log.data, expect)
if not match:
self.fail("'{0}' did not match '{1}'".format(api.log.data, expect))
def testIssuesSince(self):
api = github.GitHub("http://127.0.0.8:9898/")
issues = api.issues(since=1499838499)
self.assertEqual(len(issues), 1)
self.assertEqual(issues[0]["number"], "7")
def testWhitelist(self):
api = github.GitHub("http://127.0.0.8:9898/")
whitelist = api.whitelist()
self.assertTrue(len(whitelist) > 0)
self.assertEqual(whitelist, set(["one", "two", "three"]))
self.assertNotIn("four", whitelist)
self.assertNotIn("", whitelist)
def testTeamIdFromName(self):
api = github.GitHub("http://127.0.0.8:9898/")
self.assertEqual(api.teamIdFromName(github.TEAM_CONTRIBUTORS), 1)
self.assertEqual(api.teamIdFromName("Other team"), 2)
self.assertRaises(KeyError, api.teamIdFromName, "team that doesn't exist")
def testLastIssueDelete(self):
api = github.GitHub("http://127.0.0.8:9898/")
self.assertEqual(len(api.issues()), 2)
api.delete("/issues/7")
issues = api.issues()
self.assertEqual(len(issues), 1)
self.assertEqual(issues[0]["number"], "5")
if __name__ == '__main__':
unittest.main()