import random import re import json import cloudscraper from streamlink.plugin import Plugin, pluginmatcher from streamlink.plugin.api import validate from streamlink.stream import HLSStream _url_re = re.compile(r"http(s)?://(www\.)?camsoda\.com/(?P<username>[^\"\']+)") _api_video_schema = validate.Schema( { "token": str, "edge_servers": [str], "stream_name": str } ) scraper = cloudscraper.create_scraper() headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:134.0) Gecko/20100101 Firefox/134.0", "Referer": "https://www.camsoda.com" } @pluginmatcher(_url_re) class Camsoda(Plugin): API_URL_VIDEO = "https://www.camsoda.com/api/v1/video/vtoken/{0}?username=guest_{1}" HLS_URL_VIDEO_EDGE = "https://{server}/{stream_name}_v1/index.m3u8?token={token}" HLS_URL_VIDEO = "https://{server}/mp4:{stream_name}_aac/playlist.m3u8?token={token}" def _get_api_video(self, username): url = self.API_URL_VIDEO.format(username, str(random.randint(1000, 99999))) response = scraper.get(url, headers=headers) if response.status_code != 200: self.logger.error(f"Failed to get video API data, status code: {response.status_code}") return None try: data_video = response.json() if "status" in data_video and data_video["status"] == 0: self.logger.info(f"Nickname '{username}' is invalid") return None if "edge_servers" in data_video and not data_video["edge_servers"]: if "stream_name" in data_video: if not data_video["stream_name"]: self.logger.info(f"OFFLINE") else: self.logger.info(f"*****PRIVATE*****") return None validate.validate(_api_video_schema, data_video) except Exception as e: self.logger.error(f"Error parsing API response: {e}") return None return data_video def _get_streams(self): match = _url_re.match(self.url) username = match.group("username").replace("/", "") data_video = self._get_api_video(username) if not data_video: return hls_url = self.HLS_URL_VIDEO.format( server=data_video["edge_servers"][0], stream_name=data_video["stream_name"], token=data_video["token"] ) if "edge" in data_video["edge_servers"][0]: hls_url = self.HLS_URL_VIDEO_EDGE.format( server=data_video["edge_servers"][0], stream_name=data_video["stream_name"], token=data_video["token"] ) for s in HLSStream.parse_variant_playlist(self.session, hls_url).items(): yield s __plugin__ = Camsoda
import random import re import json import cloudscraper from streamlink.plugin import Plugin, pluginmatcher from streamlink.plugin.api import validate from streamlink.stream import HLSStream _url_re = re.compile(r"http(s)?://(www\.)?camsoda\.com/(?P<username>[^\"\']+)") _api_video_schema = validate.Schema( { "token": str, "edge_servers": [str], "stream_name": str } ) scraper = cloudscraper.create_scraper() headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:134.0) Gecko/20100101 Firefox/134.0", "Referer": "https://www.camsoda.com" } proxies = { "http": "http://127.0.0.1:2080", "https": "http://127.0.0.1:2080" } @pluginmatcher(_url_re) class Camsoda(Plugin): API_URL_VIDEO = "https://www.camsoda.com/api/v1/video/vtoken/{0}?username=guest_{1}" HLS_URL_VIDEO_EDGE = "https://{server}/{stream_name}_v1/index.m3u8?token={token}" HLS_URL_VIDEO = "https://{server}/mp4:{stream_name}_aac/playlist.m3u8?token={token}" def _get_api_video(self, username): url = self.API_URL_VIDEO.format(username, str(random.randint(1000, 99999))) response = scraper.get(url, headers=headers, proxies=proxies) if response.status_code != 200: self.logger.error(f"Failed to get video API data, status code: {response.status_code}") return None try: data_video = response.json() if "status" in data_video and data_video["status"] == 0: self.logger.info(f"Nickname '{username}' is invalid") return None if "edge_servers" in data_video and not data_video["edge_servers"]: if "stream_name" in data_video: if not data_video["stream_name"]: self.logger.info(f"OFFLINE") else: self.logger.info(f"*****PRIVATE*****") return None validate.validate(_api_video_schema, data_video) except Exception as e: self.logger.error(f"Error parsing API response: {e}") return None return data_video def _get_streams(self): match = _url_re.match(self.url) username = match.group("username").replace("/", "") data_video = self._get_api_video(username) if not data_video: return hls_url = self.HLS_URL_VIDEO.format( server=data_video["edge_servers"][0], stream_name=data_video["stream_name"], token=data_video["token"] ) if "edge" in data_video["edge_servers"][0]: hls_url = self.HLS_URL_VIDEO_EDGE.format( server=data_video["edge_servers"][0], stream_name=data_video["stream_name"], token=data_video["token"] ) for s in HLSStream.parse_variant_playlist(self.session, hls_url).items(): yield s __plugin__ = Camsoda