__init__.py (4179B)
1 import os 2 import json 3 import getpass 4 from datetime import datetime 5 import itertools 6 7 import numpy 8 9 import data 10 11 12 class NumpyEncoder(json.JSONEncoder): 13 def default(self, o): 14 if type(o).__module__ == numpy.__name__: 15 return o.item() 16 super(NumpyEncoder, self).default(o) 17 18 19 class EGJ(object): 20 def save(self, path=getpass.getuser(), append=False): 21 path = os.path.join(data.path, 'visualizer', path) 22 if append: 23 if not os.path.isdir(path): 24 raise ValueError("Can't append to the given directory") 25 name = str(1+max(map(int, filter(str.isdigit, os.listdir(path)))+[-1])) 26 path = os.path.join(path, name) 27 else: 28 while os.path.isdir(path): 29 path = os.path.join(path, '0') 30 31 with open(path, 'w') as f: 32 self.write(f) 33 34 def write(self, file): 35 file.write(json.dumps(self.object(), cls=NumpyEncoder)) 36 37 def type(self): 38 return 'raw' 39 40 def options(self): 41 return [] 42 43 def object(self): 44 return { 45 'type': self.type(), 46 'data': { 47 'type': 'FeatureCollection', 48 'crs': { 49 'type': 'name', 50 'properties': { 51 'name': 'urn:ogc:def:crs:OGC:1.3:CRS84' 52 } 53 }, 54 'features': self.features() 55 } 56 } 57 58 59 class Point(EGJ): 60 def __init__(self, latitude, longitude, info=None): 61 self.latitude = latitude 62 self.longitude = longitude 63 self.info = info 64 65 def features(self): 66 d = { 67 'type': 'Feature', 68 'geometry': { 69 'type': 'Point', 70 'coordinates': [self.longitude, self.latitude] 71 } 72 } 73 if self.info is not None: 74 d['properties'] = { 'info': self.info } 75 return [d] 76 77 78 class Path(EGJ): 79 def __init__(self, path, info=''): 80 self.path = path 81 self.info = info 82 83 def features(self): 84 info = self.info + '''trip_id: %(trip_id)s<br> 85 call_type: %(call_type_f)s<br> 86 origin_call: %(origin_call)d<br> 87 origin_stand: %(origin_stand)d<br> 88 taxi_id: %(taxi_id)d<br> 89 timestamp: %(timestamp_f)s<br> 90 day_type: %(day_type_f)s<br> 91 missing_data: %(missing_data)d<br>''' \ 92 % dict(self.path, 93 call_type_f = ['central', 'stand', 'street'][self.path['call_type']], 94 timestamp_f = datetime.fromtimestamp(self.path['timestamp']).strftime('%c'), 95 day_type_f = ['normal', 'holiday', 'holiday eve'][self.path['day_type']]) 96 97 return [{ 98 'type': 'Feature', 99 'properties': { 100 'info': info, 101 'display': 'path', 102 'timestamp': self.path['timestamp'] 103 }, 104 'geometry': { 105 'type': 'LineString', 106 'coordinates': [[lon, lat] for (lat, lon) in zip(self.path['latitude'], self.path['longitude'])] 107 } 108 }] 109 110 111 class Vlist(EGJ, list): 112 def __init__(self, cluster=False, heatmap=False, distrib=False, *args): 113 list.__init__(self, *args) 114 self.cluster = cluster 115 self.heatmap = heatmap 116 self.distrib = distrib 117 118 def type(self): 119 ts = self.cluster + self.heatmap + self.distrib 120 assert ts <= 1 121 if ts > 0: 122 if all(isinstance(c, Point) for c in self): 123 if self.cluster: 124 return 'cluster' 125 elif self.heatmap: 126 return 'heatmap' 127 elif self.distrib: 128 return 'point distribution' 129 else: 130 raise ValueError('Building a %s with something that is not a Point' % ('cluster' if self.cluster else 'heatmap')) 131 else: 132 return 'raw' 133 134 def features(self): 135 return list(itertools.chain.from_iterable(p.features() for p in self))